Occurrent 0.12.0 is now available with some better fine-grained configuration options for all MongoDB event stores as well as new cloud event converters for XStream and Jackson.
Changelog:
- Added ability to map event type to event name in subscriptions DSL from Kotlin
- Upgraded Kotlin to 1.5.31
- Upgraded spring-boot used in examples to 2.5.4
- Upgraded spring-mongodb to 3.2.5
- Upgraded the mongodb java driver to 4.3.2
- Upgraded project reactor to 3.4.10
- Upgrading to cloudevents sdk 2.2.0
- Minor tweak in ApplicationService extension function for Kotlin so that it no longer converts the Java stream into a temporary Kotlin sequence before converting it to a List
- Allow configuring (using the
EventStoreConfig
builder) whether transactional reads should be enabled or disabled for all MongoDB event stores. This is an advanced feature, and you almost always want to have it enabled. There are two reasons for disabling it:- There’s a bug/limitation on Atlas free tier clusters which yields an exception when reading large number of events in a stream in a transaction.
To workaround this you could disable transactional reads. The exception takes this form:
java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:79) at com.mongodb.internal.session.BaseClientSessionImpl.getServerSession(BaseClientSessionImpl.java:101) at com.mongodb.internal.session.ClientSessionContext.getSessionId(ClientSessionContext.java:44) at com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.getSessionId(ClusterClockAdvancingSessionContext.java:46) at com.mongodb.internal.connection.CommandMessage.getExtraElements(CommandMessage.java:265) at com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:155) at com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138) at com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:59) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:268) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:253) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:110) at com.mongodb.internal.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:268) at com.mongodb.internal.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:141) at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
It’s possible that this would work if you enable “no cursor timeout” on the query, but this is not allowed on Atlas free tier.
- You’re set back by the performance penalty of transactions and are willing to sacrifice read consistency
If you disable transactional reads, you may end up with a mismatch between the version number in the
EventStream
and the last event returned from the event stream. This is because Occurrent does two reads to MongoDB when reading an event stream. First it finds the current version number of the stream (A), and secondly it queries for all events (B). If you disable transactional reads, then another thread might have written more events before the call to B has been made. Thus, the version number received from query A might be stale. This may or may not be a problem for your domain, but it’s generally recommended having transactional reads enabled. Configuration example:EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder().transactionalReads(false). .. .build(); eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
- There’s a bug/limitation on Atlas free tier clusters which yields an exception when reading large number of events in a stream in a transaction.
To workaround this you could disable transactional reads. The exception takes this form:
- Added ability to tweak query options for reads in the event store, for example cursor timeouts, allow reads from slave etc. You can configure this in the
EventStoreConfig
for each event store by using thequeryOption
higher-order function. For example:EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder().eventStoreCollectionName(connectionString.getCollection()).transactionConfig(mongoTransactionManager).timeRepresentation(TimeRepresentation.DATE) .queryOptions(query -> query.noCursorTimeout().allowSecondaryReads()).build(); var eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
Note that you must not use this to change the query itself, i.e. don’t use the
Query#with(Sort)
etc. Only use options such asQuery#cursorBatchSize(int)
that doesn’t change the actual query or sort order. This is an advanced feature and should be used sparingly. - Added ability to convert a
Stream
of cloud events to domain events and vice versa in theCloudEventConverter
by overriding the newtoCloudEvents
and/ortoDomainEvents
methods. The reason for overriding any of these methods is to allow adding things such as correlation id that should be the same for all events in a stream. - Non-backward compatible change: The cloud event converter module name has changed from
org.occurrent:cloudevent-converter
toorg.occurrent:cloudevent-converter-api
- Non-backward compatible change: The generic cloud event converter (
org.occurrent.application.converter.generic.GenericCloudEventConverter
) has been moved to its own module, depend onorg.occurrent:cloudevent-converter-generic
to use it. - Introduced a cloud event converter that uses XStream to (de-)serialize the domain event to cloud event data. Depend on
org.occurrent:cloudevent-converter-xstream
and then use it like this:XStream xStream = new XStream(); xStream.allowTypeHierarchy(MyDomainEvent.class); XStreamCloudEventConverter<MyDomainEvent> cloudEventConverter = new XStreamCloudEventConverter<>(xStream, URI.create("urn:occurrent:domain"));
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder,
new XStreamCloudEventConverter.Builder<MyDomainEvent>().. build()
. - Introduced a cloud event converter that uses Jackson to (de-)serialize the domain event to cloud event data. Depend on
org.occurrent:cloudevent-converter-jackson
and then use it like this:ObjectMapper objectMapper = new ObjectMapper(); JacksonCloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, URI.create("urn:occurrent:domain"));
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder,
new JacksonCloudEventConverter.Builder<MyDomainEvent>().. build()
.