Fork me on GitHub

Documentation

The documentation on this page is always for the latest version of Occurrent, currently 0.19.8.

If you like Occurrent, please consider starring us on GitHub: Like Occurrent? Star us on GitHub:

Introduction

Occurrent is in an early stage so API's, and even the data model, are subject to change in the future.

Occurrent is an event sourcing library, or if you wish, a set of event sourcing utilities for the JVM, created by Johan Haleby. There are many options for doing event sourcing in Java already so why build another one? There are a few reasons for this besides the intrinsic joy of doing something yourself:

Concepts

Event Sourcing

Every system needs to store and update data somehow. Many times this is done by storing the current state of an entity in the database. For example, you might have an entity called Order stored in a order table in a relational database. Everytime something happens to the order, the table is updated with the new information and replacing the previous values. Event Sourcing is a technique that instead stores the changes, represented by events, that occurred for the entity. Events are facts, things that have happened, and they should never be updated. This means that not only can you derive the current state from the set of historic events, but you also know which steps that were involved to reach the current state.

CloudEvents

Cloud events is a CNCF specification for describing event data in a common way. CloudEvents seeks to dramatically simplify event declaration and delivery across services, platforms, and beyond. In Occurrent, you don’t persist your domain events directly to an event store, instead you convert them to a cloud event. You may regard a CloudEvent as a standardized envelope around the data in your domain event.

In practice, this means that instead of storing events in a proprietary or arbitrary format, Occurrent, stores events in accordance with the cloud event specification, even at the data-store level. I.e. you know the structure of your events, even in the database that the event store uses. It’s up to you as a user of the library to convert your domain events into cloud events when writing to the event store. This is extremely powerful, not only does it allow you to design your domains event in any way you find fit (for example without compromises enforced by a JSON serialization library) but it also allows for easier migration, data consistency and features such as (fully-consistent) queries to the event store for certain use cases. A cloud event is made-up by a set of pre-defined attributes described in the cloud event specification. In the context of event sourcing, we can leverage these attributes in the way suggested below:

Cloud Event
Attribute Name
Event Sourcing Nomenclature  Description
id event id The cloud event id attribute is used to store the id of a unique event in a particular context (“source”). Note that this id doesn’t necessarily need to be globally unique (but the combination of id and source must). Typically this would be a UUID.

source category You can regard the “source” attribute as the “stream type” or a “category” for certain streams. For example, if you’re creating a game, you may have two kinds of aggregates in your bounded context, a “game” and a “player”. You can regard these as two different sources (categories). These are represented as URN’s, for example the “game” may have the source “urn:mycompany:mygame:game” and “player” may have “urn:mycompany:mygame:player”. This allows, for example, subscriptions to subscribe to all events related to any player (by using a subscription filter for the source attribute).

subject “subject” (~identifier) A subject describes the event in the context of the source, typically an entity (aggregate) id that all events in the stream are related to. This property is optional (because Occurrent automatically adds the streamid attribute) and it’s possible that you may not need to add it. But it can be quite useful. For example, a stream may not necessarily, just hold contents of a single aggregate, and if so the subject can be used to distinguish between different aggregates/entities in a stream. Another example would be if you have multiple streams that represents different aspects of the same entity. For example, if you have a game where players are awarded points based on their performance in the game after the game has ended, you may decide to represent “point awarding” and “game play” as different streams, but they refer to the same “game id”. You can then use the “game id” as subject.

type event type The type of the event. It may be enough to just use name of the domain event, such as “GameStarted” but you may also consider using a URN (e.g. “urn:mycompany:game:started”) or qualify it (“com.mycompany.game.started”). Note that you should try to avoid using the fully-qualified class name of the domain event since you’ll run into trouble if you’re moving the domain event to a different package.

time event time The time when the event occurred (typically would be the application time and not the processing time) described by RFC 3339 (represented as java.time.OffsetDateTime by the CloudEvent SDK).

datacontenttype content-type The content-type of the data attribute, typically you want to use “application/json”, which is also the default if you don’t specify any content-type at all.

dataschema schema The URI to a schema describing the data in the cloud event (optional).

data event data The actual data needed to represent your domain event, for example the contents of a GameStarted event. You can leave out this attribute entirely if your event is fully described by other attributes.

Note that the table above is to be regarded as a rule of thumb, it’s ok to map things differently if it’s better suited for your application, but it’s a good idea to keep things consistent throughout your organization. To see an example of how this may look in code, refer to the application service documentation.

Occurrent CloudEvent Extensions

Occurrent automatically adds two extension attributes to each cloud event written to the event store:

Attribute Name Type Description
streamid         String         An id that uniquely identifies a particular event stream.
It’s used to determine which events belong to which stream.
streamversion Long The id of the stream version for a particular event.
It’s used for optimistic concurrency control.

These are required for Occurrent to operate. A long-term goal of Occurrent is to come up with a standardized set of cloud event extensions that are agreed upon and used by several different vendors.

In the meantime, it’s quite possible that Occurrent will provide a wider set of optional extensions in the future (such as correlation id and/or sequence number). But for now, it’s up to you as a user to add these if you need them (see CloudEvent Metadata), you would typically do this by creating or extending/wrapping an already existing application service.

CloudEvent Metadata

You can specify metadata to the cloud event by making use of extension attributes. This is the place to add things such as sequence number, correlation id, causation id etc. Actually there’s already a standard way of applying distributed tracing and sequence number generation extensions to cloud events that might be of interest.

EventStore

The event store is a place where you store events. Events are immutable pieces of data describing state changes for a particular stream. A stream is a collection of events that are related, typically but not limited to, a particular entity. For example a stream may include all events for a particular instance of a game or an order.

Occurrent provides an interface, EventStore, that allows to read and write events from the database. The EventStore interface is actually composed of various smaller interfaces since not all databases supports all aspects provided by the EventStore interface. Here’s an example that writes a cloud event to the event store and read it back:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Note that when reading the events, the EventStore won’t simply return a Stream of CloudEvent’s, instead it returns a wrapper called EventStream.

EventStream

The EventStream contains the CloudEvent’s for a stream and the version of the stream. The version can be used to guarantee that only one thread/process is allowed to write to the stream at the same time, i.e. optimistic locking. This can be achieved by including the version in a write condition.

Note that reading a stream that doesn’t exist (e.g. eventStore.read("non-existing-id") will return an instance of EventStream with an empty stream of events and 0 as version number. The reason for this is that you can use the same “application service” (a fancy word for a piece of code that loads events from the event store, applies them to the domain model and writes the new events returned to the event store) for both entity creation and subsequent use cases. For example consider this simple domain model:

public class WordGuessingGame {
	public static Stream<CloudEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<CloudEvent> guessWord(Stream<CloudEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package 
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<CloudEvent>, word : String) : Stream<CloudEvent> = ...
 }

Then we could write a generic application service that takes a higher-order function (Stream<CloudEvent>) -> Stream<CloudEvent>:

public class ApplicationService {

    private final EventStore eventStore;

    public ApplicationService(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    public void execute(String streamId, Function<Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);

        // Invoke the domain model  
        Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventStream.events());

        // Persist the new events  
        eventStore.write(streamId, eventStream.version(), newEvents);
    }
}
class ApplicationService constructor (val eventStore : EventStore) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        
         // Invoke the domain model 
        val newEvents = functionThatCallsDomainModel(eventStream.events())

        // Persist the new events
        eventStore.write(streamId, eventStream.version(), newEvents)
    }
}
Note that typically the domain model, WordGuessingGame in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in this example above for brevity.

You could then call the application service like this regardless of you’re starting a new game or not:

// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
String gameId = ...
String wordToGuess = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId, __ -> WordGuessingGame.startNewGame(gameId, wordToGuess));  

// Later a player guess a word:
String gameId = ...
String guess = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));
// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
val gameId : String = ...
val wordToGuess : String = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId) { 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}  

// Later a player guess a word:
val gameId : String = ...
val guess : String = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));

Writing application services like this is both powerful and simple (once you get used to it). There’s less need for explicit commands and command handlers (the application service is a kind of command handler). You can also use other functional techniques such as partial application to make the code look, arguably, even nicer. It’s also easy to compose several calls to the domain model into one by using standard functional composition techniques. For example in this case you might consider both starting a game and let the player make her first guess from a single request to the REST API. No need to change the domain model to do this, just use function composition.

Write Condition

A “write condition” can be used to specify conditional writes to the event store. Typically, the purpose of this would be to achieve optimistic concurrency control (optimistic locking) of an event stream.

For example, image you have an Account to which you can deposit and withdraw money. A business rule says that it’s not allowed to have a negative balance on an account. Now imagine an account that is shared between two persons and contains 20 EUR. Person “A” wants to withdraw 15 EUR and person “B” wants to withdraw 10 EUR. If they try to do this, an error message should be presented to one of them since the account balance would be negative. But what happens if both persons try to withdraw the money at the same time? Let’s have a look:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw, and returns new events. 
// In this case, a "MoneyWasWithdrawn" event is returned,  since 15 EUR is OK to withdraw.     
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store  
eventStore.write("account1", events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // B

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events);
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw. It returns a stream of 
// new events, in this case only a "MoneyWasWithdrawn" event,  since 15 EUR is OK to withdraw.     
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR))

// We write the new events to the event store  
eventStore.write("account1", events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1") // B

// Again we want to withdraw money, and the system will think this is OK, 
// since the Account thinks that 10 EUR will have a balance of 10 EUR after 
// the withdrawal.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events)
Note that typically the domain model, Account in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in the example above for brevity, look at the command section for a more real-life example.

To avoid the problem above we want to make use of conditional writes. Let’s see how:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
long currentVersion = eventStream.version(); 

// Withdraw money
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A 
long currentVersion = eventStream.version();

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events); 
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A
val currentVersion = eventStream.version() 

// Withdraw money
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1"); // A 
val currentVersion = eventStream.version()

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events) 

What you’ve seen above is a simple, but widely used, form of write condition. Actually, doing eventStore.write("streamId", version, events) is just a shortcut for:

eventStore.write("streamId", WriteCondition.streamVersionEq(version), events);
eventStore.write("streamId", WriteCondition.streamVersionEq(version), events)
WriteCondition can be imported from "org.occurrent.eventstore.api.WriteCondition".

But you can compose a more advanced write condition using a Condition:

eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events);
eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events)

where lt, ne and and is statically imported from org.occurrent.condition.Condition.

EventStore Queries

Since Occurrent builds on-top of existing databases it’s ok, given that you know what you’re doing*, to use the strengths of these databases. One such strength is that databases typically have good querying support. Occurrent exposes this with the EventStoreQueries interface that an EventStore implementation may implement to expose querying capabilities. For example:

OffsetDateTime lastTwoHours = OffsetDateTime.now().minusHours(2); 
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
Stream<CloudEvent> events = eventStore.query(subject("123").and(time(gte(lastTwoHours))), SortBy.time(DESCENDING));
val lastTwoHours = OffsetDateTime.now().minusHours(2);
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
val events : Stream<CloudEvent> = eventStore.query(subject("123").and(time(gte(lastTwoHours))), SortBy.time(DESCENDING))
*There's a trade-off when it's appropriate to query the database vs creating materialized views/projections and you should most likely create indexes to allow for fast queries.

The subject and time methods are statically imported from org.occurrent.filter.Filter and lte is statically imported from org.occurrent.condition.Condition.

EventStoreQueries is not bound to a particular stream, rather you can query any stream (or multiple streams at the same time). It also provides the ability to get an “all” stream:

// Return all events in an event store sorted by descending order
Stream<CloudEvent> events = eventStore.all(SortBy.time(DESCENDING));
// Return all events in an event store sorted by descending order
val events : Stream<CloudEvent> = eventStore.all(SortBy.time(DESCENDING))

The EventStoreQueries interface also supports skip and limit capabilities which allows for pagination:

// Skip 42, limit 1024
Stream<CloudEvent> events = eventStore.all(42, 1024);
// Skip 42, limit 1024
val events : Stream<CloudEvent> = eventStore.all(42, 1024)

To get started with an event store refer to Choosing An EventStore.

Subscriptions

A subscription is a way to get notified when new events are written to an event store. Typically, a subscription will be used to create views from events (such as projections, sagas, snapshots etc) or create integration events that can be forwarded to another piece of infrastructure such as a message bus. There are two different kinds of API’s, the first one is a blocking API represented by the org.occurrent.subscription.api.blocking.SubscriptionModel interface (in the org.occurrent:subscription-api-blocking module), and second one is a reactive API represented by the org.occurrent.subscription.api.reactor.SubscriptionModel interface (in the org.occurrent:subscription-api-reactor module).

The blocking API is callback based, which is fine if you’re working with individual events (you can of course use a simple function that aggregates events into batches yourself). If you want to work with streams of data, the reactor SubscriptionModel is probably a better option since it’s using the Flux publisher from project reactor.

Note that it’s fine to use reactive SubscriptionModel, even though the event store is implemented using the blocking api, and vice versa. If the datastore allows it, you can also run subscriptions in a different process than the processes reading and writing to the event store.

To get started with subscriptions refer to Using Subscriptions.

EventStore Operations

Occurrent event store implementations may optionally also implement the EventStoreOperations interface. It provides means to delete a specific event, or an entire event stream. For example:

// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId");
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource);
// This will delete all events in stream "myStream" that has a version less than or equal to 19.
eventStoreOperations.delete(streamId("myStream").and(streamVersion(lte(19L)));
// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId")
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource)
// This will delete all events in stream "myStream" that has a version less than or equal to 19.
eventStoreOperations.delete(streamId("myStream").and(streamVersion(lte(19L)))

These are probably operations that you want to use sparingly. Typically, you never want to remove any events, but there are some cases, such as GDPR or other regulations, that requires the deletion of an event or an entire event stream. You should be aware that there are other ways to solve this though. One way would be to encrypt personal data and throw away the key when the user no longer uses the service. Another would be to store personal data outside the event store.

Another reason for deleting events is if you’re implementing something like “closing the books” or certain types of snapshots, and don’t need the old events anymore.

Another feature provided by EventStoreOperations is the ability to update an event. Again, this is not something you normally want to do, but it can be useful for certain strategies of GDPR compliance. For example maybe you want to remove or update personal data in an event when a users unregisters from your service. Here’s an example:

eventStoreOperations.updateEvent("cloudEventId", cloudEventSource, cloudEvent -> {
    return CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build();
});
eventStoreOperations.updateEvent("cloudEventId", cloudEventSource) { cloudEvent -> 
    CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build()
})

Views

Occurrent doesn’t have any special components for creating views/projections. Instead, you simply create a subscription in which you can create and store the view as you find fit. But this doesn’t have to be difficult! Here’s a trivial example of a view that maintains the number of ended games. It does so by inceasing the “numberOfEndedGames” field in an (imaginary) database for each “GameEnded” event that is written to the event store:

// An imaginary database API
Database someDatabase = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game.   
subscriptionModel.subscribe("my-view", filter(type("GameEnded")), cloudEvent -> someDatabase.inc("numberOfEndedGames"));        
// An imaginary database API
val someDatabase : Database = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game. 
subscriptionModel.subscribe("my-view", filter(type("GameEnded"))) {  
    someDatabase.inc("numberOfEndedGames")
}

Where filter is imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is imported from org.occurrent.condition.Condition.

While this is a trivial example it shouldn’t be difficult to create a view that is backed by a JPA entity in a relational database based on a subscription.

Commands

A command is used to represent an intent in an event sourced system, i.e. something that you want to do. They’re different, in a very important way, from events in that commands can fail or be rejected, where-as events cannot. A typical example of a command would be a data structure whose name is defined as an imperative verb, for example PlaceOrder. The resulting event, if the command is processed successfully, could then be OrderPlaced. However, in Occurrent, as explained in more detail in the Command Philosophy section below, you may start off by not using explicit data structures for commands unless you want to. In Occurrent, you can instead use pure functions to represent commands and command handling. Combine this with function composition and you have a powerful way to invoke the domain model (refer to the application service for examples).

Command Philosophy

Occurrent doesn’t contain a built-in command bus. Instead, you’re encouraged to pick any infrastructure component you need to act as the command bus to send commands to another service. Personally, I typically make a call to a REST API or make an RPC invocation instead of using a distributed command bus that routes the commands to my aggregate. There are of course exceptions to this, such as the need for location transparency or if you’re using Decider’s. If you need location transparency, a command bus or an actor model can be of help. But I would argue that you may not always need the complexity by prematurely going down this route if your business requirements doesn’t point you in this direction. Decider’s are a nice alternative, that doesn’t require any additional infrastructure.

But what about internally? For example if a service exposes a REST API and upon receiving a request it publishes a command that’s somehow picked up and routed to a function in your domain model. This is where an application service becomes useful. However, let’s first explore the rationale behind the philosophy of Occurrent. In other frameworks, it’s not uncommon that you define your domain model like this:

public class WordGuessingGame extends AggregateRoot {

    @AggregateId
    private String gameId;
    private String wordToGuess;

    @HandleCommand
    public void handle(StartNewGameCommand startNewGameCommand) {
        // Insert some validation and logic
        ... 
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessingGameWasStartedEvent(...));
    }
    
    @HandleCommand
    public void handle(GuessWordCommand guessWordCommand) {
        // Some validation and implementation ...
        ...	
        
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessedEvent(...));
    }

    @HandleEvent
    public void handle(WordGuessingGameWasStartedEvent e) {
        this.gameId = e.getGameId();
        this.wordToGuess = e.getWordToGuess();    
    }        

    ... 
}
 class WordGuessingGame : AggregateRoot() {
 
     @AggregateId
     var gameId : String
     var wordToGuess : String
 
     @HandleCommand
     fun handle(startNewGameCommand : StartNewGameCommand) {
         // Insert some validation and logic
         ... 
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessingGameWasStartedEvent(...))
     }
     
     @HandleCommand
     fun handle(guessWordCommand : GuessWordCommand) {
         // Some validation and implementation ...
         ...	
         
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessedEvent(...))
     }
 
     @HandleEvent
     fun handle(e : WordGuessingGameWasStartedEvent) {
         gameId = e.getGameId()
         wordToGuess = e.getWordToGuess()    
     }        
 
     ... 
 }
This is a made-up example of an imaginary event sourcing framework, it's not how you're encouraged to implement a domain model using Occurrent.

Let’s look at a “command” and see what it typically looks like in these frameworks:

public class StartNewGameCommand {
    
    @AggregateId
    private String gameId;
    private String wordToGuess;
    
    public void setGameId(String gameId) {
        this.gameId = gameId;
    }
    
    public String getGameId() {
        return gameId;
    }

    public void setWordToGuess(String wordToGuess) {
        this.gameId = gameId;
    }
    
    public String getWordToGuess() {
        return wordToGuess;
    }
    
    // Equals/hashcode/tostring methods are excluded for breivty
}
data class StartNewGameCommand(@AggregateId var gameId: String, val wordToGuess : String)

Now that we have our WordGuessingGame implementation and a command we can dispatch it to a command bus:

commandbus.dispatch(new StartNewGameCommand("someGameId", "Secret word"));
commandbus.dispatch(StartNewGameCommand("someGameId", "Secret word"))

From a typical Java perspective one could argue that this is not too bad. But it does have a few things one could improve upon from a broader perspective:

  1. The WordGuessingGame is complecting several things that may be modelled separately. Data, state, behavior, command- and event routing and event publishing are all defined in the same model (the WordGuessingGame class). It also uses framework specific annotations, classes and inheritance inside your domain model which is something you want to avoid. For small examples like this it arguably doesn’t matter, but if you have complex logic and a large system, it probably will in my experience. Keeping state and behavior separate allows for easier testing, referential transparency and function composition. It allows treating the state as a value which has many benefits.
  2. Commands are defined as explicit data structures with framework-specific annotations when arguably they don’t have to. This is fine if you need to serialize the command (in order to send it to another location or to schedule it for the future) but one could argue that you don’t want to couple your commands to some infrastructure. This is of course a trade-off, in Occurrent you’re free to choose any approach you like (i.e. commands/functions can be completely free of framework/library/infrastructure concerns).

Commands in Occurrent

So how would one dispatch commands in Occurrent? As we’ve already mentioned there’s nothing stopping you from using a (distributed) command bus or to create explicit commands, and dispatch them the way we did in the example above. For example, if you’re using Decider’s, it could be nice to have commands explicitly defined. But if you recognize some of the points described above and are looking for a simpler approach, here’s another way to go about. First let’s refactor the domain model to pure functions, without any state or dependencies to Occurrent or any other library/framework.

public class WordGuessingGame {
	public static Stream<DomainEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<DomainEvent> guessWord(Stream<DomainEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package.
// You might also want to use a Kotlin Sequence or List instead of a Stream
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<DomainEvent>, word : String) : Stream<DomainEvent> = ...
 }

If you define your behavior like this it’ll be easy to test (and also to compose using normal function composition techniques). There are no side-effects (such as publishing events) which also allows for easier testing and local reasoning.

But where are our commands!? In this example we’ve decided to represent them as functions. I.e. the “command” is modeled as simple function, e.g. startNewGame! This means that the command handling logic is handled by function as well. You don’t need to switch/match over the command since you directly invoke the function itself. Again, you may prefer to actually define your commands explicitly, but in this example we’ll just be using normal functions.

But wait, how are these functions called? Create or copy a generic ApplicationService class like the one below (or use the generic application service provided by Occurrent):

public class ApplicationService {

    private final EventStore eventStore;
    private final Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent;
    private final Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent;

    public ApplicationService(EventStore eventStore, 
                              Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent, 
                              Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent) {
        this.eventStore = eventStore;
        this.convertCloudEventToDomainEvent = convertCloudEventToDomainEvent;
        this.convertDomainEventToCloudEvent = convertDomainEventToCloudEvent;
    }

    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);
        // Convert the cloud events into domain events
        Stream<DomainEvent> domainEventsInStream = eventStream.events().map(convertCloudEventToDomainEvent);

        // Call a pure function from the domain model which returns a Stream of domain events  
        Stream<DomainEvent> newDomainEvents = functionThatCallsDomainModel.apply(domainEventsInStream);

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent));
    }
}
class ApplicationService constructor (val eventStore : EventStore, 
                                      val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent, 
                                      val convertDomainEventToCloudEvent : (DomainEvent) -> CloudEvent) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        // Convert the cloud events into domain events
        val domainEventsInStream : Stream<DomainEvent> = eventStream.events().map(convertCloudEventToDomainEvent)

        // Call a pure function from the domain model which returns a Stream of domain events
        val newDomainEvents = functionThatCallsDomainModel(domainEventsInStream)

        // Convert domain events to cloud events and write them to the event store
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent))
    }
}

and then use the ApplicationService like this:

// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
EventStore eventStore = ..
ApplicationService applicationService = new ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.execute(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = ..
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent  = ..
val eventStore : EventStore = ..
val applicationService = ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService.execute(gameId) { events -> 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}

We’re leveraging higher-order functions instead of using explicit commands.

Command Composition

Many times it’s useful to compose multiple commands into a single unit-of-work for the same stream/aggregate. What this means that you’ll “merge” several commands into one, and they will be executed in an atomic fashion. I.e. either all commands succeed, or all commands fail.

While you’re free to use any means and/or library to achieve this, Occurrent ships with a “command composition” library that you can leverage:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>command-composition</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:command-composition:0.19.8'
libraryDependencies += "org.occurrent" % "command-composition" % "0.19.8"
@Grab(group='org.occurrent', module='command-composition', version='0.19.8') 
[org.occurrent/command-composition "0.19.8"]
'org.occurrent:command-composition:jar:0.19.8'
<dependency org="org.occurrent" name="command-composition" rev="0.19.8" />

As an example consider this simple domain model:

public class WordGuessingGame {
    public Stream<DomainEvent> startNewGame(Stream<DomainEvents> events, String gameId, String wordToGuess) {
        // Implementation
    }
    
    public Stream<DomainEvent> guessWord(Stream<DomainEvents> events, String guess) {
        // Implementation
    }
}
object WordGuessingGame {
    fun startNewGame(events : Sequence<DomainEvent>), gameId : String, wordToGuess : String) : Sequence<DomainEvent> {
        // Implementation
    } 

    fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
        // Implementation
    }    
}

Imagine that for a specific API you want to allow starting a new game and making a guess in the same request. Instead of changing your domain model, you can use function composition! If you import org.occurrent.application.composition.command.StreamCommandComposition.composeCommands you can do like this:

String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
    events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess),
    events -> WordGuessingGame.makeGuess(events, guess) 
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
    { events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess) }
    { events -> WordGuessingGame.makeGuess(events, guess) } 
))
If you're using commands that takes and returns a "java.util.List" instead of a Stream, you can instead statically import "composeCommands" from "org.occurrent.application.composition.command.ListCommandComposition". If you're using Kotlin you should import the "composeCommands" extension function from "org.occurrent.application.composition.command.composeCommands".

If you’re using Kotlin you can also make use of the andThen (infix) function for command composition (import org.occurrent.application.composition.command.andThen):

applicationService.execute(gameId,
    { events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess) }
        andThen { events -> WordGuessingGame.makeGuess(events, guess) })

Events returned from WordGuessingGame.startNewGame(..) will be appended to the event stream when calling WordGuessingGame.makeGuess(..) and the new domain events returned by the two functions will be merged and written in an atomic fashion to the event store.

The command composition library also contains some utilities for partial function application that you can use to further enhance the example above (if you like). If you statically import partial method from org.occurrent.application.composition.command.partial.PartialFunctionApplication you can refactor the code above into this:

String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
    partial(WordGuessingGame::startNewGame, gameId, wordToGuess),
    partial(WordGuessingGame::makeGuess, guess)
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
    WordGuessingGame::startNewGame.partial(gameId, wordToGuess)
    WordGuessingGame::makeGuess.partial(guess) 
))
If you're using Kotlin, important the "partial" extension function from "org.occurrent.application.composition.command.partial".

With Kotlin, you can also use andThen (described above) to do:

applicationService.execute(gameId, 
    WordGuessingGame::startNewGame.partial(gameId, wordToGuess)
            andThen WordGuessingGame::makeGuess.partial(guess))

Command Conversion

If you have an application service that takes a higher-order function in the form of Function<Stream<DomainEvent>, Stream<DomainEvent>> but your domain model is defined with list’s (Function<List<DomainEvent, List<DomainEvent>) then Occurrent provides means to easily convert between them. First you need to depend on the Command Composition library:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>command-composition</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:command-composition:0.19.8'
libraryDependencies += "org.occurrent" % "command-composition" % "0.19.8"
@Grab(group='org.occurrent', module='command-composition', version='0.19.8') 
[org.occurrent/command-composition "0.19.8"]
'org.occurrent:command-composition:jar:0.19.8'
<dependency org="org.occurrent" name="command-composition" rev="0.19.8" />

Let’s say you have a domain model defined like this:

public class WordGuessingGame {
    public List<DomainEvent> guessWord(List<DomainEvents> events, String guess) {
        // Implementation
    }
}
object WordGuessingGame {
    fun guessWord(events : List<DomainEvent>, guess : String) : List<DomainEvent> {
        // Implementation
    }    
}

But you application service takes a Function<Stream<DomainEvent>, Stream<DomainEvent>>:

public class ApplicationService {
    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Implementation
    }
}

Then you can make use of the toStreamCommand in org.occurrent.application.composition.command.CommandConversion to call the domain function:

String guess = ...
applicationService.execute(gameId, toStreamCommand( events -> WordGuessingGame.makeGuess(events, guess)));
val guess = ...
applicationService.execute(gameId, toStreamCommand { events -> WordGuessingGame.makeGuess(events, guess) } )
You can also use the "toListCommand" method to convert a "Function<Stream<DomainEvent>, Stream<DomainEvent>>" into a "Function<List<DomainEvent>, List<DomainEvent>>"

CloudEvent Conversion

To convert between domain events and cloud events you can use the cloud event converter API that’s shipped with Occurrent. This is optional, but components such as the application service and subscription dsl uses a cloud event converter to function. If you’re only using an event store and subscriptions then you don’t need a cloud event converter (or you can roll your own). All cloud event converters implements the org.occurrent.application.converter.CloudEventConverter interface from the org.occurrent:cloudevent-converter module (see custom cloudevent converter).

Generic CloudEvent Converter

This is a really simple cloud event converter to which you can pass two higher-order functions that converts to and from domain events respectively. To use it depend on:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>cloudevent-converter-generic</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-generic:0.19.8'
libraryDependencies += "org.occurrent" % "cloudevent-converter-generic" % "0.19.8"
@Grab(group='org.occurrent', module='cloudevent-converter-generic', version='0.19.8') 
[org.occurrent/cloudevent-converter-generic "0.19.8"]
'org.occurrent:cloudevent-converter-generic:jar:0.19.8'
<dependency org="org.occurrent" name="cloudevent-converter-generic" rev="0.19.8" />

For example:

Function<CloudEvent, DomainEvent> convertCloudEventToDomainEventFunction = .. // You implement this function
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEventFunction = .. // You implement this function
CloudEventConverter<CloudEvent> cloudEventConverter = new GenericCloudEventConverter<>(convertCloudEventToDomainEventFunction, convertDomainEventToCloudEventFunction);

If your domain model is already using a CloudEvent (and not a custom domain event) then you can just pass a Function.identity() to the GenericCloudEventConverter:

CloudEventConverter<CloudEvent> cloudEventConverter = new GenericCloudEventConverter<>(Function.identity(), Function.identity());

XStream CloudEvent Converter

This cloud event converter uses XStream to convert domain events to cloud events to XML and back. To use it, first depend on this module:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>cloudevent-converter-xstream</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-xstream:0.19.8'
libraryDependencies += "org.occurrent" % "cloudevent-converter-xstream" % "0.19.8"
@Grab(group='org.occurrent', module='cloudevent-converter-xstream', version='0.19.8') 
[org.occurrent/cloudevent-converter-xstream "0.19.8"]
'org.occurrent:cloudevent-converter-xstream:jar:0.19.8'
<dependency org="org.occurrent" name="cloudevent-converter-xstream" rev="0.19.8" />

Next you can instantiate it like this:

XStream xStream = new XStream();
xStream.allowTypeHierarchy(MyDomainEvent.class);
URI cloudEventSource = URI.create("urn:company:domain") 
XStreamCloudEventConverter<MyDomainEvent> cloudEventConverter = new XStreamCloudEventConverter<>(xStream, cloudEventSource);
val xStream = XStream().apply { allowTypeHierarchy(MyDomainEvent::class.java) }
val cloudEventSource = URI.create("urn:company:domain")
val cloudEventConverter = new XStreamCloudEventConverter<>(xStream, cloudEventSource)

You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder, new XStreamCloudEventConverter.Builder<MyDomainEvent>().. build().

Jackson CloudEvent Converter

This cloud event converter uses Jackson to convert domain events to cloud events to JSON and back. To use it, first depend on this module:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>cloudevent-converter-jackson</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-jackson:0.19.8'
libraryDependencies += "org.occurrent" % "cloudevent-converter-jackson" % "0.19.8"
@Grab(group='org.occurrent', module='cloudevent-converter-jackson', version='0.19.8') 
[org.occurrent/cloudevent-converter-jackson "0.19.8"]
'org.occurrent:cloudevent-converter-jackson:jar:0.19.8'
<dependency org="org.occurrent" name="cloudevent-converter-jackson" rev="0.19.8" />

Next you can instantiate it like this:

ObjectMapper objectMapper = new ObjectMapper();
URI cloudEventSource = URI.create("urn:company:domain")
JacksonCloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource);
val objectMapper = jacksonObjectMapper()
val cloudEventSource = URI.create("urn:company:domain")
val cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource)

You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder, new JacksonCloudEventConverter.Builder<MyDomainEvent>().. build(). In production, you almost certainly want to change the way the JacksonCloudEventConverter generates the cloud event type from the domain event. By default, the cloud event type will be generated from the fully-qualified class name of the domain event class type. I.e. if you do:

CloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource);
CloudEvent cloudEvent = cloudEventConverter.toCloudEvent(new SomeDomainEvent());

Then cloudEvent.getType() will return com.mycompany.SomeDomainEvent. Typically, you want to decouple the cloud event type from the fully-qualified name of the class. A better, but arguably still not optimal way, would be to make cloudEvent.getType() return SomeDomainEvent instead. The JacksonCloudEventConverter allows us to do this by using the builder:

CloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter.Builder<MyDomainEvent>()
        .typeMapper(..) // Specify a custom way to map the domain event to a cloud event and vice versa
        .build();

But when using Jackson, we can’t just configure the type mapper to return the “simple name” of the domain event class instead of the fully-qualified name. This is because there’s no generic way to derive the fully-qualified name from just the simple name. The fully-qualified name is needed in order for Jackson to map the cloud event back into a domain event. In order to work-around this you could implement your own type mapper (that you pass to the builder above) or create an instance of ReflectionCloudEventTypeMapper that knows how to convert the “simple name” cloud event type back into the domain event class. There are a couple of ways, the most simple one is probably this:

CloudEventTypeMapper<MyDomainEvent> typeMapper = ReflectionCloudEventTypeMapper.simple(MyDomainEvent.class);

This will create an instance of ReflectionCloudEventTypeMapper that uses the simple name of the domain event as cloud event type. But the crucial thing is that when deriving the domain event type from the cloud event, the ReflectionCloudEventTypeMapper will prepend the package name of supplied domain event type (MyDomainEvent) to the cloud event type, thus reconstructing the fully-qualified name of the class. For this to work, all domain events must reside in exactly the same package as MyDomainEvent.

Another approach would be to supply a higher-order function that knows how to map the cloud event type back into a domain event class.

CloudEventTypeMapper<MyDomainEvent> typeMapper = ReflectionCloudEventTypeMapper.simple(cloudEventType -> ...);

Again, this will create an instance of ReflectionCloudEventTypeMapper that uses the simple name of the domain event as cloud event type, but you are responsible to, somehow, map the cloud event type (cloudEventType) back into a domain event class.

If you don’t want to use reflection or don’t want to couple the class name to the event name (which is recommended) you can roll your own custom CloudEventTypeMapper by implementing the org.occurrent.application.converter.typemapper.CloudEventTypeMapper interface.

Custom CloudEvent Converter

To create a custom cloud event converter first depend on:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>cloudevent-converter-api</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-api:0.19.8'
libraryDependencies += "org.occurrent" % "cloudevent-converter-api" % "0.19.8"
@Grab(group='org.occurrent', module='cloudevent-converter-api', version='0.19.8') 
[org.occurrent/cloudevent-converter-api "0.19.8"]
'org.occurrent:cloudevent-converter-api:jar:0.19.8'
<dependency org="org.occurrent" name="cloudevent-converter-api" rev="0.19.8" />

Let’s have a look at a naive example of how we can create a custom converter that converts domain events to cloud events (and vice versa). This cloud event converter can then be used with the generic application service (the application service implementation provided by Occurrent) and other Occurrent components that requires a CloudEventConverter. Note that instead of using the code below you might as well use the Jackson CloudEvent Converter, this is just an example showing how you could roll your own.

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.occurrent.application.converter.CloudEventConverter;

import java.io.IOException;
import java.net.URI;

import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;

public class MyCloudEventConverter implements CloudEventConverter<DomainEvent> {

    private final ObjectMapper objectMapper;

    public MyCloudEventConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public CloudEvent toCloudEvent(DomainEvent e) {
        try {
            return CloudEventBuilder.v1()
                    .withId(e.getEventId())
                    .withSource(URI.create("urn:myapplication:streamtype"))
                    .withType(getCloudEventType(e))
                    .withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC)
                                           .truncatedTo(ChronoUnit.MILLIS))
                    .withSubject(e.getName())
                    .withDataContentType("application/json")
                    .withData(objectMapper.writeValueAsBytes(e))
                    .build();
        } catch (JsonProcessingException jsonProcessingException) {
            throw new RuntimeException(jsonProcessingException);
        }
    }

    @Override
    public DomainEvent toDomainEvent(CloudEvent cloudEvent) {
        try {
            return (DomainEvent) objectMapper.readValue(cloudEvent.getData().toBytes(), Class.forName(cloudEvent.getType()));
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
    
    @Override
    public String getCloudEventType(Class<? extends T> type) {
        return type.getName();
    }
}        
While this implementation works for simple cases, make sure that you think before simply copying and pasting this class into your own code base. The reason is that you may not need to serialize all data in the domain event to the data field (some parts of the domain event, such as id and type, is already present in the cloud event), and the "type" field contains the fully-qualified name of the class which makes it more difficult to move without loosing backward compatibility. Also your domain events might not be serializable to JSON without conversion. For these reasons, it's recommended to create a more custom mapping between a cloud event and domain event.

To see what the attributes mean in the context of event sourcing refer to the CloudEvents documentation. You can also have a look at GenericApplicationServiceTest.java for an actual code example.

Note that if the data content type in the CloudEvent is specified as “application/json” (or a json compatible content-type) then Occurrent will automatically store it as Bson in a MongoDB event store. The reason for this so that you’re able to query the data, either by the EventStoreQueries API, or manually using MongoDB queries. In order to do this, the byte[] passed to withData, will be converted into a org.bson.Document that is later written to the database. This is not optimal from a performance perspective. A more performant option would be to make use of the io.cloudevents.core.data.PojoCloudEventData class. This class implements the io.cloudevents.CloudEventData interface and allows passing a pre-baked Map or org.bson.Document instance to it. Then no additional conversion will need to take place! Here’s an example:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import org.bson.Document;

import java.io.IOException;
import java.net.URI;

import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;
import static java.time.temporal.ChronoUnit.MILLIS;

public class MyCloudEventConverter implements CloudEventConverter<DomainEvent> {
    
    private final ObjectMapper objectMapper;
    
    public MyCloudEventConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    } 

    @Override
    public CloudEvent toCloudEvent(DomainEvent e) {  
            // Convert the data in the domain event into a Document 
            Map<String, Object> eventData = convertDataInDomainEventToMap(e);
            return CloudEventBuilder.v1()
                    .withId(e.getEventId())
                    .withSource(URI.create("http://name"))
                    .withType(getCloudEventType(e))
                    .withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC)
                                           .truncatedTo(MILLIS))
                    .withSubject(e.getName())
                    .withDataContentType("application/json")  
                    // Use the "eventData" map to create an instance of PojoCloudEventData.
                    // If an event store implementation doesn't know how to handle "Map" data,
                    // it'll call the higher-order function that converts the map into byte[] 
                    // (objectMapper::writeValueAsBytes), that it _has_ to understand.
                    // But since all Occurrent event stores currently knows how to handle maps, 
                    // the objectMapper::writeValueAsBytes method will never be called.
                    .withData(PojoCloudEventData.wrap(eventData, objectMapper::writeValueAsBytes))
                    .build();
    }
    
    @Override
    public DomainEvent toDomainEvent(CloudEvent cloudEvent) {
        CloudEventData cloudEventData = cloudEvent.getData();
        if (cloudEventData instanceof PojoCloudEventData && cloudEventData.getValue() instanceof Map) {
            Map<String, Object> eventData = ((PojoCloudEventData<Map<String, Object>>) cloudEventData).getValue();
            return convertToDomainEvent(cloudEvent, eventData);
        } else {
            return objectMapper.readValue(cloudEventData.toBytes(), DomainEvent.class); // try-catch omitted
        }
    }
    
    @Override
    public String getCloudEventType(Class<? extends T> type) {
        return type.getSimpleName();
    }
    
    private static Map<String, Object> convertDataInDomainEventToDocument(DomainEvent e) {
        // Convert the domain event into a Map                
        Map<String, Object> data = new HashMap<String, Object>();
        if (e instanceof GameStarted) {
           data.put("type", "GameStarted"); 
           // Put the rest of the values
        } else if (...) {
            // More events
        }
        return map;
    }

    private static DomainEvent convertToDomainEvent(CloudEvent cloudEvent, Map<String, Object> data) {
        // Re-construct the domain event instance from the cloud event and data
        switch ((String) data.get("type")) {
            case "GameStarted" -> // Convert map to GameStartedEvent 
                break;
            ...
        }
    }
}        
Tip: Instead of working directly with maps, you can use Jackson to convert a DTO into a Map, by calling "jackson.convertValue(myDTO, new TypeReference<Map<String, Object>>() {});".

Application Service

Occurrent provides a generic application service that is a good starting point for most use cases. First add the module as a dependency to your project:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>application-service-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:application-service-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "application-service-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='application-service-blocking', version='0.19.8') 
[org.occurrent/application-service-blocking "0.19.8"]
'org.occurrent:application-service-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="application-service-blocking" rev="0.19.8" />

This module provides an interface, org.occurrent.application.service.blocking.ApplicationService, and a default implementation, org.occurrent.application.service.blocking.implementation.GenericApplicationService. The GenericApplicationService takes an EventStore and a org.occurrent.application.converter.CloudEventConverter implementation as parameters. The latter is used to convert domain events to and from cloud events when loaded/written to the event store. There’s a default implementation that you may decide to use called, org.occurrent.application.converter.implementation.GenericCloudEventConverter available in the org.occurrent:cloudevent-converter-generic module. You can see an example in the next section.

As of version 0.11.0, the GenericApplicationService also takes a RetryStrategy as an optional third parameter.
By default, the retry strategy uses exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry, if a WriteConditionNotFulfilledException is caught (see write condition docs). It will, again by default, only retry 5 times before giving up, rethrowing the original exception. You can override the default strategy by calling new GenericApplicationService(eventStore, cloudEventConverter, retryStrategy). Use new GenericApplicationService(eventStore, cloudEventConverter, RetryStrategy.none()) to disable retry. This is also useful if you want to use another retry library.

Using the Application Service

Now you can instantiate the (blocking) GenericApplicationService:

EventStore eventStore = ..
CloudEventConverter<DomainEvent> cloudEventConverter = ..
ApplicationService<DomainEvent> applicationService = new GenericApplicationService<>(eventStore, cloudEventConverter);
val eventStore : EventStore = ..
val cloudEventConverter : CloudEventConverter<DomainEvent> = ..
val applicationService : ApplicationService<DomainEvent> = GenericApplicationService(eventStore, cloudEventConverter)

You’re now ready to use the generic application service in your application.

If you're using Decider's then refer to the docs here.

As an example let’s say you have a domain model with a method defined like this:

public class WordGuessingGame {
    public static Stream<DomainEvent> guessWord(Stream<DomainEvent> events, String guess) {
        // Implementation
    }    
}

You can call it using the application service:

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess));
applicationService.execute(gameId) { events -> 
    WordGuessingGame.guessWord(events, guess)
}

Application Service Side-Effects

The GenericApplicationService supports executing side-effects after the events returned from the domain model have been written to the event store. This is useful if you need to, for example, update a view synchronously after events have been written to the event store. Note that to perform side-effects (or policies) asynchronously you should use a subscription. As an example, consider that you want to synchronously register a game as ongoing when it is started. It may be defined like this:

public class RegisterOngoingGame {
    private final DatabaseApi someDatabaseApi;
    
    pubic RegisterOngoingGame(DatabaseApi someDatabaseApi) {
        this.someDatabaseApi = someDatabaseApi;
    }

    public void registerGameAsOngoingWhenGameWasStarted(GameWasStarted event) {
        // Add the id of the game started event to a set to handle duplicates and idempotency.
        someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));                
    }
}
class RegisterOngoingGame(private val someDatabaseApi : DatabaseApi) {
    fun registerGameAsOngoingWhenGameWasStarted(event : GameWasStarted) {
        // Add the id of the game started event to a set to handle duplicates and idempotency.
        someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));                
    }
}

The reason for doing this synchronously is, for example, if you have a REST API and the player expects the ongoing games view to be updated once the “start game” command has executed. This can be achieved by other means (RSocket, Websockets, server-sent events, polling) but synchronous updates is simple and works quite well in many cases.

Now that we have the code that registers ongoing games, we can call it from our from the application service:

RegisterOngoingGame registerOngoingGame = ..
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), events -> {
    events.filter(event -> event instanceof GameWasStarted)
         .findFirst()
         .map(GameWasStarted.class::cast)
         .ifPresent(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
});
val registerOngoingGame : RegisterOngoingGame = ..
applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }) { events -> 
    val gameWasStarted = events.find { event is GameWasStarted }
    if(gameWasStarted != null) {
        registerOngoingGame.registerGameAsOngoingWhenGameWasStarted(gameWasStarted) 
    }
}

Voila! Now registerGameAsOngoingWhenGameWasStarted will be called after the events returned from WordGuessingGame.guessWord(..) is written to the event store. You can however improve on the code above and make use of the executePolicy method shipped with the application service in the org.occurrent.application.service.blocking.PolicySideEffect. The code can then be refactored to this:

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)); 

If using the Kotlin extension functions (org.occurrent.application.service.blocking.executePolicy) you can write the code like this:

applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }, executePolicy<GameWasStarted>(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted))

Policies can also be composed:

RegisterOngoingGame registerOngoingGame  = ..
RemoveFromOngoingGamesWhenGameEnded removeFromOngoingGamesWhenGameEnded = ..

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), 
                           executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
                            .andThenExecuteAnotherPolicy(removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded));
val registerOngoingGame : RegisterOngoingGame = ..
val removeFromOngoingGamesWhenGameEnded : RemoveFromOngoingGamesWhenGameEnded = ..

applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }, 
                            executePolicies(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted, 
                                            removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded))

Application Service Transactional Side-Effects

In the example above, writing the events to the event store and executing policies is not an atomic operation. If your app crashes after the call to registerOngoingGame::registerGameAsOngoingWhenGameWasStarted but before removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded, you will need to handle idempotency. But if your policies/side-effects are writing data to the same database as the event store you can make use of transactions to write everything atomically! This is very easy if you’re using a Spring EventStore. What you need to do is to wrap the ApplicationService provided by Occurrent in your own application service, something like this:

@Service
public class CustomApplicationServiceImpl implements ApplicationService<DomainEvent> {
	private final GenericApplicationService<DomainEvent> occurrentApplicationService;

	public CustomApplicationService(GenericApplicationService<DomainEvent> occurrentApplicationService) {
		this.occurrentApplicationService = occurrentApplicationService;
	}

	@Transactional
	@Override
    public void execute(String gameId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel, Consumer<Stream<DomainEvent>> sideEffect) {
		occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect);
    }
}
@Service
class CustomApplicationServiceImpl(val occurrentApplicationService:  GenericApplicationService<DomainEvent>) : ApplicationService<DomainEvent> {

	@Transactional
    override fun execute(gameId : String, functionThatCallsDomainModel: Function<Stream<DomainEvent>, Stream<DomainEvent>> , sideEffect : Consumer<Stream<DomainEvent>>) {
		occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect)
    }
}

Given that you’ve defined a MongoTransactionManager in your Spring Boot configuration (and using this when creating your event store instance) the side-effects and events are written atomically in the same transaction!

Application Service Kotlin Extensions

If you’re using Kotlin chances are that your domain model is using a Sequence instead of a java Stream:

object WordGuessingGame {
    fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
        // Implementation
    }    
}

Occurrent provides a set of extension functions for Kotlin in the application service module:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>application-service-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:application-service-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "application-service-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='application-service-blocking', version='0.19.8') 
[org.occurrent/application-service-blocking "0.19.8"]
'org.occurrent:application-service-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="application-service-blocking" rev="0.19.8" />

You can then use one of the org.occurrent.application.service.blocking.execute extension functions to do:

applicationService.execute(gameId) { events : Sequence<DomainEvent> -> 
    WordGuessingGame.guessWord(events, guess)
}

Sagas

A “saga” can be used to represent and coordinate a long-lived business transaction/process (where “long-lived” is kind of arbitrary). This is an advanced subject and you should try to avoid sagas if there are other means available to solve the problem (for example use policies if they are sufficient). Occurrent doesn’t provide or enforce any specific Saga implementation. But since Occurrent is a library you can hook in already existing solutions, for example:

The way to integrate Occurrent with any of these libraries/frameworks/patterns is to create a subscription that forwards the events written to the event store to the preferred library/framework/view.

Policy

A policy (aka reaction/trigger) can be used to deal with workflows such as “whenever this happens, do that”. For example, whenever a game is won, send an email to the winner. For simple workflows like this there’s typically no need for a full-fledged saga.

Asynchronous Policy

In Occurrent, you can create asynchronous policies by creating a subscription. Let’s consider the example above:

public class WhenGameWonThenSendEmailToWinnerPolicy {

    private final SubscriptionModel subscriptionModel;
    private final EmailClient emailClient;
    private final Players players;
    
    public WhenGameWonThenSendEmailToWinnerPolicy(SubscriptionModel subscriptionModel, EmailClient emailClient, Players players) {
        this.subscriptionModel = subscriptionModel;
        this.emailClient = emailClient;
        this.players = players;
    }
    
    @PostConstruct
    public void whenGameWonThenSendEmailToWinner() {
        subscriptionModel.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")), cloudEvent -> {
            String playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()));
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!");
        }          
    }
}
class WhenGameWonThenSendEmailToWinnerPolicy(val subscriptionModel : SubscriptionModel, 
                                             val emailClient : EmailClient, val players : Players) {

    @PostConstruct
    fun whenGameWonThenSendEmailToWinner() = 
        subscriptionModel.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")) { cloudEvent -> 
            val playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()))
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!")
        }          
}

You could also create a generic policy that simply forwards all events to another piece of infrastructure. For example, you may wish to forward all events to rabbitmq (by publishing them) or Spring’s event infrastructure, and then create policies that subscribes to events from these systems instead. There’s an example in the github repository that shows an example of how one can achieve this.

You may also want to look into the “todo-list” pattern described in the automation section on the in the event modeling website.

Synchronous Policy

In some cases, for example if you have a simple website and you want views to be updated when a command is dispatched by a REST API, it can be useful to update a policy in a synchronous fashion. The application service provided by Occurrent allows for this, please see the application service documentation for an example.

Snapshots

Using snapshots is an advanced technique and it shouldn't be used unless it's really necessary.

Snapshotting is an optimization technique that can be applied if it takes too long to derive the current state from an event stream for each command. There are several ways to do this and Occurrent doesn’t enforce any particular strategy. One strategy is to use so-called “snapshot events” (special events that contains a pre-calculated view of the state of an event stream at a particular time) and another technique is to write snapshots to another datastore than the event store.

The application service need to be modified to first load the up the snapshot and then load events that have not yet been materialized in the snapshot (if any).

Synchronous Snapshots

With Occurrent, you can trade-off write speed for understandability. For example, let’s say that you want to update the snapshot on every write and it should be consistent with the writes to the event store. One way to do this is to use Spring’s transactional support:

public class ApplicationService {

    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    
    public ApplicationService(EventStore eventStore,  SnapshotRepository snapshotRepository) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
    }
    
    @Transactional
    public void execute(String streamId, BiFunction<Snapshot, Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read snapshot from a the snapshot repsitory 
        Snapshot snapshot = snapshotRepsitory.findByStreamId(streamId);
        long snapshotVersion = snapshot.version();        
    
        // Read all events for "streamId" from snapshotVersion  
        EventStream<CloudEvent> eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE);

        // Call a pure function from the domain model which returns a Stream of new events  
        List<CloudEvent> newEvents = functionThatCallsDomainModel.apply(snapshot.state(), eventStream).collect(Collectors.toList());

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream());
        
        // Update the snapshot
        Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
        snapshotRepsitory.save(updatedSnapshot);
    }
}
class ApplicationService(val eventStore : EventStore, val snapshotRepository : SnapshotRepository) {
    
    @Transactional
    fun execute(String streamId, functionThatCallsDomainModel : (Snapshot, Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read snapshot from a the snapshot repsitory 
        val snapshot : Snapshot = snapshotRepsitory.findByStreamId(streamId)
        long snapshotVersion = snapshot.streamVersion()        
    
        // Read all events for "streamId" from snapshotVersion  
        val eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE)

        // Call a pure function from the domain model which returns a Stream of new events  
        val newEvents = functionThatCallsDomainModel(snapshot.state(), eventStream).collect(Collectors.toList())

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream())
        
        // Update the snapshot
        val updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version())
        snapshotRepsitory.save(updatedSnapshot)
    }
}
This is a somewhat simplified example but the idea is hopefully made clear.

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (eventStream.version() - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Asynchronous Snapshots

As an alternative to synchronous and fully-consistent snapshots, you can update snapshots asynchronously. You do this by creating a subscription that updates the snapshot. For example:

public class UpdateSnapshotWhenNewEventsAreWrittenToEventStore {

    private final SubscriptionModel subscriptionModel;
    private final SnapshotRepository snapshotRepository;
    
    public UpdateSnapshotWhenNewEventsAreWrittenToEventStore(SubscriptionModel subscriptionModel, SnapshotRepository snapshotRepository) {
        this.subscription = subscription;
        this.snapshotRepository = snapshotRepository;
    }
    
    @PostConstruct
    public void updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", cloudEvent -> {
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent);
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent);
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId);        
            snapshot.updateFrom(cloudEvent, streamVersion);
            snapshotRepository.save(snapshot);
        }          
    }
}
class UpdateSnapshotWhenNewEventsAreWrittenToEventStore(val subscription : BlockingSubscription, 
                                                        val snapshotRepository : SnapshotRepository) {

    @PostConstruct
    fun updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", { cloudEvent -> 
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent)
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent)
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId)        
            snapshot.updateFrom(cloudEvent, streamVersion)
            snapshotRepository.save(snapshot)
        }          
    }
}

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (streamVersion - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Closing the Books

This is a pattern that can be applied instead of updating snapshots for every n event. The idea is to try to keep event streams short and instead create snapshots periodically. For example, once every month we run a job that creates snapshots for certain event streams. This is especially well suited for problem domains where “closing the books” is a concept in the domain (such as accounting).

Deadlines

Deadlines (aka scheduling, alarm clock) is a very handy technique to schedule to something to be executed in the future. Imagine, for example, a multiplayer game (like word guessing game shown in previous examples), where we want to game to end automatically after 10 hours of inactivity. This means that as soon as a player has made a guess, we’d like to schedule a “timeout game command” to be executed after 10 hours.

The way it works in Occurrent is that you schedule a Deadline (org.occurrent.deadline.api.blocking.Deadline) using a DeadlineScheduler (org.occurrent.deadline.api.blocking.DeadlineScheduler) implementation. The Deadline is a date/time in the future when the deadline is up. To handle the deadline, you also register a DeadlineConsumer (org.occurrent.deadline.api.blocking.DeadlineConsumer) to a DeadlineConsumerRegistry (org.occurrent.deadline.api.blocking.DeadlineConsumerRegistry) implementation, and it’ll be invoked when a deadline is up. For example:

// In some method we schedule a deadline two hours from now with data "hello world" 
var deadlineId = UUID.randomUUID(); 
var deadlineCategory = "hello-world"; 
var deadline = Deadline.afterHours(2);
deadlineScheduler.schedule(deadlineId, deadlineCategory, deadline, "hello world");

// In some other method, during application startup, we register a deadline consumer to the registry for the "hello-world" deadline category
deadlineConsumerRegistry.register("hello-world", (deadlineId, deadlineCategory, deadline, data) -> System.out.println(data));

In the example above, the deadline consumer will print “hello world” after 2 hours.

There are two implementations of DeadlineScheduler and DeadlineConsumerRegistry, JobRunr and InMemory.

JobRunr Deadline Scheduler

This is a persistent (meaning that your application can be restarted and deadlines are still around) DeadlineScheduler based on JobRunr. To get started, depend on:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>deadline-jobrunr</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:deadline-jobrunr:0.19.8'
libraryDependencies += "org.occurrent" % "deadline-jobrunr" % "0.19.8"
@Grab(group='org.occurrent', module='deadline-jobrunr', version='0.19.8') 
[org.occurrent/deadline-jobrunr "0.19.8"]
'org.occurrent:deadline-jobrunr:jar:0.19.8'
<dependency org="org.occurrent" name="deadline-jobrunr" rev="0.19.8" />

You then need to create an instance of org.jobrunr.scheduling.JobRequestScheduler (see JobRunr documentation for different ways of doing this). Once you have a JobRequestScheduler you need to create an instance of org.occurrent.deadline.jobrunr.JobRunrDeadlineScheduler:

JobRequestScheduler jobRequestScheduler = ..
DeadlineScheduler deadlineScheduler = new JobRunrDeadlineScheduler(jobRequestScheduler); 

You also need to create an instance of org.occurrent.deadline.jobrunr.JobRunrDeadlineConsumerRegistry:

DeadlineConsumerRegistry deadlineConsumerRegistry = new JobRunrDeadlineConsumerRegistry();  

You register so-called “deadline consumers” to the DeadlineConsumerRegistry for a certain “category” (see example above). A deadline consumer will be invoked once a deadline is up. Note that you can only have one deadline consumer instance per category. You want to register your deadline consumer everytime your application starts up. If you’re using Spring, you can, for example, do this using a @PostConstructor method:

@PostConstruct
void registerDeadlineConsumersOnApplicationStart() {
    deadlineConsumerRegistry.register("CancelPayment", (id, category, deadline, data) -> {
        CancelPayment cancelPayment = (CancelPayment) data; 
        paymentApi.cancel(cancelPayment.getPaymentId());    
    }):
}

The example above will register a deadline consumer for the “CancelPayment” category and call an imaginary api (paymentApi) to cancel the payment. The deadline consumer will be called by Occurrent when the scheduled deadline is up. Here’s an example of you can schedule this deadline using the JobRunrDeadlineConsumerRegistry, but first lets see what CancelPayment looks like:

public class CancelPayment {
    private String paymentId;

    CancelPayment() {
    }

    public CancelPayment(String paymentId) {
        this.paymentId = paymentId;
    }
    
    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public String getPaymentId() {
        return paymentId;
    }
}

As you can see it’s a regular Java POJO. This is very important since JobRunr needs to serialize/de-serialize this class to the database. Typically, this is done using Jackson (so it’s fine to use Jackson annotations etc), but JobRunr has support for other mappers as well.

Now lets see how we can schedule a “CancelPayment”:

String paymentId = ...
deadlineScheduler.schedule(UUID.randomUUID(), "CancelPayment", Deadline.afterWeeks(2), new CancelPayment(paymentId));

This will schedule a deadline after 2 weeks, that’ll be picked-up by the deadline consumer registered in the registerDeadlineConsumersOnApplicationStart method. Note that in this case, the class (CancelPayment) has the same name as the category, but this is not required.

Here’s an example of you can setup Occurrent Deadline Scheduling in Spring Boot:

@Configuration
public class DeadlineSpringConfig {

    @Bean
    JobRunrConfigurationResult initJobRunr(ApplicationContext applicationContext, MongoClient mongoClient,
                                           @Value("${spring.data.mongodb.uri}") String mongoUri) {
        var connectionString = new ConnectionString(mongoUri);
        var database = connectionString.getDatabase();

        return JobRunr.configure()
                .useJobActivator(applicationContext::getBean)
                .useStorageProvider(new MongoDBStorageProvider(mongoClient, database, "jobrunr-"))
                .useBackgroundJobServer()
                .useDashboard(
                        JobRunrDashboardWebServerConfiguration
                                .usingStandardDashboardConfiguration()
                                .andPort(8082)
                                .andAllowAnonymousDataUsage(false)
                )
                .initialize();
    }

    @PostConstruct
    void destroyJobRunnerOnShutdown() {
        JobRunr.destroy();
    }

    @Bean
    DeadlineScheduler deadlineScheduler(JobRunrConfigurationResult jobRunrConfigurationResult) {
        return new JobRunrDeadlineScheduler(jobRunrConfigurationResult.getJobRequestScheduler());
    }

    @Bean
    DeadlineConsumerRegistry deadlineConsumerRegistry() {
        return new JobRunrDeadlineConsumerRegistry();
    }
}

Have a look at JobRunr for more configuration options.

In-Memory Deadline Scheduler

This is an in-memory, non-persistent (meaning that scheduled deadlines will be lost on application restart), DeadlineScheduler. To get started, depend on:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>deadline-inmemory</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:deadline-inmemory:0.19.8'
libraryDependencies += "org.occurrent" % "deadline-inmemory" % "0.19.8"
@Grab(group='org.occurrent', module='deadline-inmemory', version='0.19.8') 
[org.occurrent/deadline-inmemory "0.19.8"]
'org.occurrent:deadline-inmemory:jar:0.19.8'
<dependency org="org.occurrent" name="deadline-inmemory" rev="0.19.8" />

Next, you need to create in instance of org.occurrent.deadline.inmemory.InMemoryDeadlineScheduler and org.occurrent.deadline.inmemory.InMemoryDeadlineConsumerRegistry. In order for these two components to communicate with each other, you also need to provide an instance of a e java.util.concurrent.BlockingDeque to the constructor. Here’s an example:

BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
DeadlineConsumerRegistry deadlineConsumerRegistry = new InMemoryDeadlineConsumerRegistry(queue);
DeadlineConsumerRegistry deadlineScheduler = new InMemoryDeadlineScheduler(queue);

You can configure things, such as poll interval and retry strategy, for InMemoryDeadlineConsumerRegistry by supplying an instance of org.occurrent.deadline.inmemory.InMemoryDeadlineConsumerRegistry$Config as the second constructor argument:

new InMemoryDeadlineConsumerRegistry(queue, new Config().pollIntervalMillis(300).retryStrategy(RetryStrategy.fixed(Duration.of(2, SECONDS))));

Note that it’s very important to call shutdown on both InMemoryDeadlineConsumerRegistry and InMemoryDeadlineScheduler on application/test end.

For usage examples, see Deadlines and JobRunr Scheduler.

Other Ways of Expressing Deadlines

If you don’t want to use any of the Occurrent libraries for deadline scheduling, or if you’re looking for more features that are not (yet) available, you can use other libraries from the Java ecosystem, such as:

Getting started

If you're using Spring Boot, you might consider using the spring-boot-starter project to get started quickly. Then you can return to this section.

Getting started with Occurrent involves these steps:

It's recommended to read up on CloudEvent's and its specification so that you're familiar with the structure and schema of a CloudEvent.
  1. Choose an underlying datastore for an event store. Luckily there are only two choices at the moment, MongoDB and an in-memory implementation. Hopefully this will be a more difficult decision in the future :)
  2. Once a datastore has been decided, it’s time to choose an EventStore implementation for this datastore since there may be more than one.
  3. If you need subscriptions (i.e. the ability to subscribe to changes from an EventStore) then you need to pick a library that implements this for the datastore that you’ve chosen. Again, there may be several implementations to choose from.
  4. If a subscriber needs to be able to continue from where it left off on application restart, it’s worth looking into a so called position storage library. These libraries provide means to automatically (or selectively) store the position for a subscriber to a datastore. Note that the datastore that stores this position can be a different datastore than the one used as EventStore. For example, you can use MongoDB as EventStore but store subscription positions in Redis.
  5. You’re now good to go, but you may also want to look into more higher-level components if you don’t have the need to role your own. We recommend looking into:

Choosing An EventStore

There are currently two different datastores to choose from, MongoDB and In-Memory.

MongoDB

Uses MongoDB, version 4.2 or above, as the underlying datastore for the CloudEvents. All implementations use transactions to guarantee consistent writes (see WriteCondition). Each EventStore will automatically create a few indexes on startup to allow for fast consistent writes, optimistic concurrency control and to avoid duplicated events. These indexes can also be used in queries against the EventStore (see EventStoreQueries).

There are three different MongoDB EventStore implementations to choose from:

MongoDB Schema

All MongoDB EventStore implementations tries to stay as close as possible to the CloudEvent’s specification even in the persitence layer. Occurrent, by default, automatically adds a custom “Occurrent extension” to each cloud event that is written to an EventStore. The Occurrent CloudEvent Extension consists of these attributes:


Attribute Name Type Description
streamid         String         An id that uniquely identifies a particular event stream.
It’s used to determine which events belong to which stream.
streamversion Long The id of the stream version for a particular event.
It’s used for optimistic concurrency control.

A json schema describing a complete Occurrent CloudEvent, as it will be persisted to a MongoDB collection, can be found here (a “raw” cloud event json schema can be found here for comparison).

Note that MongoDB will automatically add an _id field (which is not used by Occurrent). The reason why the CloudEvent id attribute is not stored as _id in MongoDB is that the id of a CloudEvent is not globally unique! The combination of id and source is a globally unique CloudEvent. Note also that _id will not be included when the CloudEvent is read from an EventStore.

Here’s an example of what you can expect to see in the “events” collection when storing events in an EventStore backed by MongoDB (given that TimeRepresentation is set to DATE):

{
	"_id : ObjectId("5f4112a348b8da5305e41f57"),
	"specversion" : "1.0",
	"id" : "bdb8481f-9e8e-443b-80a4-5ef787f0f227",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "NumberGuessingGameWasStarted",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:11.712Z"),
	"data" : {
		"secretNumberToGuess" : 8,
		"startedBy" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf",
		"maxNumberOfGuesses" : 5
	},
	"streamid" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamversion" : NumberLong(1)
}
{
	"_id" : ObjectId("5f4112a548b8da5305e41f58"),
	"specversion" : "1.0",
	"id" : "c1bfc3a5-1716-43ae-88a6-297189b1b5c7",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "PlayerGuessedANumberThatWasTooSmall",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:13.336Z"),
	"data" : {
		"guessedNumber" : 1,
		"playerId" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf"
	},
	"streamid" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamversion" : NumberLong(2)
}

MongoDB Time Representation

The CloudEvents specification says that the time attribute, if present, must adhere to the RFC 3339 specification. To accommodate this in MongoDB, the time attribute must be persisted as a String. This by itself is not a problem, a problem only arise if you want to make time-based queries on the events persisted to a MongoDB-backed EventStore (using the EventStoreQueries interface). This is, quite obviously, because time-based queries on String’s are suboptimal (to say the least) and may lead to surprising results. What we would like to do is to persist the time attribute as a Date in MongoDB, but MongoDB internally represents a Date with only millisecond resolution (see here) and then the CloudEvent cannot be compliant with the RFC 3339 specification in all circumstances.

Because of the reasons described above, users of a MongoDB-backed EventStore implementation, must decide how the time attribute is to be represented in MongoDB when instantiating an EventStore implementation. This is done by passing a value from the org.occurrent.mongodb.timerepresentation.TimeRepresentation enum to an EventStoreConfig object that is then passed to the EventStore implementation. TimeRepresentation has these values:

Value Description
RFC_3339_STRING     Persist the time attribute as an RFC 3339 string. This string is able to represent both nanoseconds and a timezone so this is recommended for apps that need to store this information or if you are uncertain of whether this is required in the future.

DATE

Persist the time attribute as a MongoDB Date. The benefit of using this approach is that you can do range queries etc on the “time” field on the cloud event. This can be really useful for certain types on analytics or projections (such as show the 10 latest number of started games) without writing any custom code.

Note that if you choose to go with RFC_3339_STRING you always have the option of adding a custom attribute, named for example “date”, in which you represent the “time” attribute as a Date when writing the events to an EventStore. This way you have the ability to use the “time” attribute to re-construct the CloudEvent time attribute exactly as well as the ability to do custom time-queries on the “date” attribute. However, you cannot use the methods involving time-based queries when using the EventStoreQueries interface.

Important: There’s yet another option! If you don’t need nanotime precision (i.e you’re fine with millisecond precision) and you’re OK with always representing the “time” attribute in UTC, then you can use TimeRepresentation.DATE without loss of precision! This is why, if DATE is configured for the EventStore, Occurrent will refuse to store a CloudEvent that specifies nanotime and is not defined in UTC (so that there won’t be any surprises). I.e. using DATE and then doing this will throw an IllegalArgumentException:

var cloudEvent = new CloudEventBuilder().time(OffsetDateTime.now()). .. .build();

// Will throw exception since OffsetDateTime.now() will include nanoseconds by default in Java 9+
eventStore.write(Stream.of(cloudEvent));

Instead, you need to remove nano seconds do like this explicitly:

// Remove millis and make sure to use UTC as timezone                                          
var now = OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS).withOffsetSameInstant(ZoneOffset.UTC);
var cloudEvent = new CloudEventBuilder().time(now). .. .build();

// Now you can write the cloud event

For more thoughts on this, refer to the architecture decision record on time representation in MongoDB.

MongoDB Indexes

Each MongoDB EventStore implementation creates a few indexes for the “events collection” the first time they’re instantiated. These are:

Name Properties Description
id + source ascending id,
descending source,  
unique

Compound index of id and source to comply with the specification that the id+source combination must be unique.
streamid + streamversion   ascending streamid,
descending streamversion,
unique
Compound index of streamid and streamversion (Occurrent CloudEvent extension) used for fast retrieval of the latest cloud event in a stream.
Prior to version 0.7.3, a streamid index was also automatically created, but it was removed in 0.7.3 since this index is covered by the streamid+streamversion index.

To allow for fast queries, for example when using EventStoreQueries, it’s recommended to create additional indexes tailored to the querying behavior of your application. See MongoDB indexes for more information on how to do this. If you have many adhoc queries it’s also worth checking out wildcard indexes which is a new feature in MongoDB 4.2. These allow you to create indexes that allow for arbitrary queries on e.g. the data attribute of a cloud event (if data is stored in json/bson format).

MongoDB EventStore Implementations

There are three different MongoDB EventStore implementations to choose from:

EventStore with MongoDB Native Driver

What is it?

An EventStore implementation that uses the “native” Java MongoDB synchronous driver (see website) to read and write CloudEvent’s to MongoDB.

When to use?

Use when you don’t need Spring support and want to use MongoDB as the underlying datastore.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-native</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-native:0.19.8'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-native" % "0.19.8"
@Grab(group='org.occurrent', module='eventstore-mongodb-native', version='0.19.8') 
[org.occurrent/eventstore-mongodb-native "0.19.8"]
'org.occurrent:eventstore-mongodb-native:jar:0.19.8'
<dependency org="org.occurrent" name="eventstore-mongodb-native" rev="0.19.8" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.nativedriver.MongoEventStore. It takes four arguments, a MongoClient, the “database” and “event collection “that the EventStore will use to store events as well as an org.occurrent.eventstore.mongodb.nativedriver.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
EventStoreConfig config = new EventStoreConfig(TimeRepresentation.RFC_3339_STRING);
String mongoDatabase = "database";
String mongoEventCollection = "events";

MongoEventStore eventStore = new MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
val config = EventStoreConfig(TimeRepresentation.RFC_3339_STRING)
val mongoDatabase = "database"
val mongoEventCollection = "events"

val eventStore = MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using MongoEventStore. It also generates integration events and publishes these to RabbitMQ.
Uno A port of FsUno, a classic card game. Stores events in MongoDB using MongoEventStore.

EventStore with Spring MongoTemplate (Blocking)

What is it?

An implementation that uses Spring’s MongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and you don’t need reactive support then this is a good choice. You can make use of the @Transactional annotation to write events and views in the same transaction (but make sure you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-blocking', version='0.19.8') 
[org.occurrent/eventstore-mongodb-spring-blocking "0.19.8"]
'org.occurrent:eventstore-mongodb-spring-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-blocking" rev="0.19.8" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

SpringMongoEventStore eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = SpringMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot. It also generates integration events and publishes these to RabbitMQ.
Word Guessing Game Similar to the “Number Guessing Game” but more advanced, leveraging several Occurrent features such as CQRS, queries, and transactional projections. Implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot.
Uno A port of FsUno, a classic card game. Implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot.
Subscription View An example showing how to create a subscription that listens to certain events stored in the EventStore and updates a view/projection from these events.
Transactional View An example showing how to combine writing events to the SpringMongoEventStore and update a view transactionally using the @Transactional annotation.
Custom Aggregation View Example demonstrating that you can query the SpringMongoEventStore using custom MongoDB aggregations.

EventStore with Spring ReactiveMongoTemplate (Reactive)

What is it?

An implementation that uses Spring’s ReactiveMongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and want to use the reactive driver (project reactor) then this is a good choice. It uses the ReactiveMongoTemplate to write events to MongoDB. You can make use of the @Transactional annotation to write events and views in the same tx (but make sure that you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-reactor</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-reactor:0.19.8'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-reactor" % "0.19.8"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-reactor', version='0.19.8') 
[org.occurrent/eventstore-mongodb-spring-reactor "0.19.8"]
'org.occurrent:eventstore-mongodb-spring-reactor:jar:0.19.8'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-reactor" rev="0.19.8" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

ReactorMongoEventStore eventStore = new ReactorMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = ReactorMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
Mono<Void> mono = eventStore.write("streamId", Flux.just(event));

// Read
Mono<EventStream<CloudEvent>> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
val mono = eventStore.write("streamId", Flux.just(event))

// Read
val eventStream : Mono<EventStream<CloudEvent>> = eventStore.read("streamId")

Examples

Name Description
Custom Aggregation View Example demonstrating that you can query the SpringMongoEventStore using custom MongoDB aggregations.

In-Memory EventStore

What is it?

A simple in-memory implementation of the EventStore interface.

When to use?

Mainly for testing purposes or for integration tests that doesn’t require a durable event store.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-inmemory</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:eventstore-inmemory:0.19.8'
libraryDependencies += "org.occurrent" % "eventstore-inmemory" % "0.19.8"
@Grab(group='org.occurrent', module='eventstore-inmemory', version='0.19.8') 
[org.occurrent/eventstore-inmemory "0.19.8"]
'org.occurrent:eventstore-inmemory:jar:0.19.8'
<dependency org="org.occurrent" name="eventstore-inmemory" rev="0.19.8" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.inmemory.InMemoryEventStore. For example:

InMemoryEventStore eventStore = new InMemoryEventStore();
val eventStore = InMemoryEventStore()

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Using Subscriptions

Before you start using subscriptions you should read up on what they are here.

There a two different kinds of subscriptions, blocking subscriptions and reactive subscriptions. For blocking subscription implementations see here and for reactive subscription implementations see here.

Blocking Subscriptions

A “blocking subscription” is a subscription that uses the normal Java threading mechanism for IO operations, i.e. reading changes from an EventStore will block the thread. This is arguably the easiest and most familiar way to use subscriptions for the typical Java developer, and it’s probably good-enough for most scenarios. If high throughput, low CPU and memory-consumption is critical then consider using reactive subscription instead. Reactive subscriptions are also better suited if you want to work with streaming data.

To create a blocking subscription, you first need to choose which “subscription model” to use. Then you create a subscription instance from this subscription model. All blocking subscriptions implements the org.occurrent.subscription.api.blocking.SubscriptionModel interface. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscriptionModel.subscribe("mySubscriptionId", System.out::println);
subscriptionModel.subscribe("mySubscriptionId", ::println)

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface SubscriptionModel {
    /**
     * Start listening to cloud events persisted to the event store using the supplied start position and <code>filter</code>.
     *
     * @param subscriptionId  The id of the subscription, must be unique!
     * @param filter          The filter used to limit which events that are of interest from the EventStore.
     * @param startAt         The position to start the subscription from
     * @param action          This action will be invoked for each cloud event that is stored in the EventStore.
     */
    Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action);

    // Default methods 

}

It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a SubscriptionModel implementation that also implement the org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel interface. The PositionAwareSubscriptionModel is an example of a SubscriptionModel that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.PositionAwareCloudEvent which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent) and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent). Note that PositionAwareCloudEvent is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareSubscriptionModel, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Blocking Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")), System.out::println);
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")), ::println)

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), System.out::println);
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), ::println)

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Blocking Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides several ways to specify the start position, either by using StartAt.now(), StartAt.subscriptionModelDefault() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Blocking Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a blocking subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.blocking.SubscriptionPositionStorage acts as a uniform abstraction for this purpose. A SubscriptionPositionStorage is defined like this:

public interface SubscriptionPositionStorage {
    SubscriptionPosition read(String subscriptionId);
    SubscriptionPosition save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    void delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships with three pre-defined implementations:

1. NativeMongoSubscriptionPositionStorage
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking-position-storage</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-position-storage:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-position-storage" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-position-storage', version='0.19.8') 
[org.occurrent/subscription-mongodb-native-blocking-position-storage "0.19.8"]
'org.occurrent:subscription-mongodb-native-blocking-position-storage:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-position-storage" rev="0.19.8" />

2. SpringMongoSubscriptionPositionStorage
Uses the Spring MongoTemplate to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking-position-storage</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-position-storage:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-position-storage" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-position-storage', version='0.19.8') 
[org.occurrent/subscription-mongodb-spring-blocking-position-storage "0.19.8"]
'org.occurrent:subscription-mongodb-spring-blocking-position-storage:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-position-storage" rev="0.19.8" />

3. SpringRedisSubscriptionPositionStorage
Uses the Spring RedisTemplate to store SubscriptionPosition’s in Redis.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-redis-spring-blocking-position-storage</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-redis-spring-blocking-position-storage:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-redis-spring-blocking-position-storage" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-redis-spring-blocking-position-storage', version='0.19.8') 
[org.occurrent/subscription-redis-spring-blocking-position-storage "0.19.8"]
'org.occurrent:subscription-redis-spring-blocking-position-storage:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-redis-spring-blocking-position-storage" rev="0.19.8" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “blocking subscription API” which contains the SubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-api-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-api-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-api-blocking', version='0.19.8') 
[org.occurrent/subscription-api-blocking "0.19.8"]
'org.occurrent:subscription-api-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-api-blocking" rev="0.19.8" />

Blocking Subscription Implementations

These are the non-durable blocking subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

In-Memory

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel interface (since these types of subscriptions needs to be aware of the position).

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareSubscriptionModel and a SubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. You can tweak how often the position should be persisted in the configuration.

Blocking Subscription using the “Native” Java MongoDB Driver

Uses the vanilla Java MongoDB synchronous driver (no Spring dependency is required).

To get started first include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking', version='0.19.8') 
[org.occurrent/subscription-mongodb-native-blocking "0.19.8"]
'org.occurrent:subscription-mongodb-native-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking" rev="0.19.8" />

Then create a new instance of NativeMongoSubscriptionModel and start subscribing:

MongoDatabase database = mongoClient.getDatabase("some-database");
// Create the blocking subscription
SubscriptionModel subscriptionModel = new NativeMongoSubscriptionModel(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.retry().fixed(200));

// Now you can create subscription instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val database = mongoClient.getDatabase("some-database")
// Create the blocking subscription
val subscriptionModel = NativeMongoSubscriptionModel(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.retry().fixed(200))

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
NativeMongoSubscriptionModel can be imported from the "org.occurrent.subscription.mongodb.nativedriver.blocking" package.

There are a few things to note here that needs explaining. First we have the TimeRepresentation.DATE that is passed as the third constructor argument which you can read more about here. Secondly we have the Executors.newCachedThreadPool(). A thread will be created from this executor for each call to “subscribe” (i.e. for each subscription). Make sure that you have enough threads to cover all your subscriptions or the “SubscriptionModel” may hang. Last we have the RetryStrategy which defines what should happen if there’s e.g. a connection issue during the life-time of a subscription or if subscription fails to process a cloud event (i.e. the action throws an exception).

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Blocking Subscription using Spring MongoTemplate

An implementation that uses Spring’s MongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking', version='0.19.8') 
[org.occurrent/subscription-mongodb-spring-blocking "0.19.8"]
'org.occurrent:subscription-mongodb-spring-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking" rev="0.19.8" />

Then create a new instance of SpringMongoSubscriptionModel and start subscribing:

MongoTemplate mongoTemplate = ...
// Create the blocking subscription
SubscriptionModel subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val mongoTemplate : MongoTemplate = ... 
// Create the blocking subscription
val subscriptionModel = SpringMongoSubscriptionModel(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
SpringMongoSubscriptionModel can be imported from the "org.occurrent.subscription.mongodb.spring.blocking" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the MongoTemplate instance.

When it comes to retries, if the “action” fails (i.e. if the higher-order function you provide when calling subscribe throws an exception), either using something like Spring Retry or the Occurrent Retry Module. By default, all subscription models will use the Occurrent retry module with exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry when reading/saving/deleting the subscription position. You can customize this by passing an instance of RetryStrategy to the SpringMongoSubscriptionModel constructor.

If you want to disable the Occurrent retry module, pass RetryStrategy.none() to the SpringMongoSubscriptionModel constructor and then handle retries anyway you find fit. For example, let’s say you want to use spring-retry, and you have a simple Spring bean that writes each cloud event to a repository:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

And you want to subscribe to all “GameStarted” events and write them to the repository:

WriteToRepository writeToRepository = ...
subscriptionModel.subscribe("gameStartedLog", writeToRepository::write);

But if the connection to someRepository is flaky you can add Spring Retry so allow for retry with exponential backoff:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

    @Retryable(backoff = @Backoff(delay = 200, multiplier = 2, maxDelay = 30000))
	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

Don’t forget to add @EnableRetry in to your Spring Boot application as well.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Restart Subscription when Oplog Lost

If there’s not enough history available in the MongoDB oplog to resume a subscription created from a SpringMongoSubscriptionModel, you can configure it to restart the subscription from the current time automatically. This is only of concern when an application is restarted, and the subscriptions are configured to start from a position in the oplog that is no longer available. It’s disabled by default since it might not be 100% safe (meaning that you can miss some events when the subscription is restarted). It’s not 100% safe if you run subscriptions in a different process than the event store and you have lot’s of writes happening to the event store. It’s safe if you run the subscription in the same process as the writes to the event store if you make sure that the subscription is started before you accept writes to the event store on startup. To enable automatic restart, you can do like this:

var subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, SpringSubscriptionModelConfig.withConfig("events", TimeRepresentation.RFC_3339_STRING).restartSubscriptionsOnChangeStreamHistoryLost(true));

An alternative approach to restarting automatically is to use a catch-up subscription and restart the subscription from an earlier date.

InMemory Subscription

If you’re using the InMemory EventStore you can use the “InMemorySubscriptionModel” to subscribe to new events. For add the dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-inmemory</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-inmemory:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-inmemory" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-inmemory', version='0.19.8') 
[org.occurrent/subscription-inmemory "0.19.8"]
'org.occurrent:subscription-inmemory:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-inmemory" rev="0.19.8" />

Then you can use it like this:

InMemorySubscriptionModel subscriptionModel = new InMemorySubscriptionModel();
InMemoryEventStore inMemoryEventStore = new InMemoryEventStore(inMemorySubscriptionModel);

subscriptionModel.subscribe("subscription1", System.out::println);

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val subscriptionModel = new InMemorySubscriptionModel()
val inMemoryEventStore = new InMemoryEventStore(inMemorySubscriptionModel)

subscriptionModel.subscribe("subscription1", ::println)

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")

Durable Subscriptions (Blocking)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that implements SubscriptionModel and combines a PositionAwareSubscriptionModel and a SubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to DurableSubscriptionModelConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new DurableSubscriptionModelConfig(3) that creates an instance of EveryN that stores the position for every third event.

If you want full control, it’s recommended to pick a subscription position storage implementation, and store the position yourself using its API.

To use it, first we need to add the dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>durable-subscription</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:durable-subscription:0.19.8'
libraryDependencies += "org.occurrent" % "durable-subscription" % "0.19.8"
@Grab(group='org.occurrent', module='durable-subscription', version='0.19.8') 
[org.occurrent/durable-subscription "0.19.8"]
'org.occurrent:durable-subscription:jar:0.19.8'
<dependency org="org.occurrent" name="durable-subscription" rev="0.19.8" />

Then we should instantiate a PositionAwareSubscriptionModel, that subscribes to the events from the event store, and an instance of a SubscriptionPositionStorage, that stores the subscription position, and combine them to a DurableSubscriptionModel:

// Create the non-durable blocking subscription instance 
PositionAwareBlockingSubscription nonDurableSubscriptionModel = ...
// Create the storage
SubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription model and the subscription position storage
SubscriptionModel durableSubscriptionModel = new DurableSubscriptionModel(nonDurableSubscriptionModel, storage);

// Start a subscription
durableSubscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscriptionModel : PositionAwareBlockingSubscription = ...
// Create the storage
val storage : SubscriptionPositionStorage = ...

// Now combine the non-durable subscription model and the subscription position storage
val durableSubscriptionModel : SubscriptionModel = new DurableSubscriptionModel(nonDurableSubscriptionModel, storage)

// Start a subscription
durableSubscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) };

Catch-up Subscription (Blocking)

When starting a new subscription it’s often useful to first replay historic events to get up-to-speed and then subscribing to new events as they arrive. A catch-up subscription allows for exactly this! It combines the EventStoreQueries API with a subscription and an optional subscription storage. It starts off by streaming historic events from the event store and then automatically switch to continuous streaming mode once the historic events have caught up.

To get start you need to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>catchup-subscription</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:catchup-subscription:0.19.8'
libraryDependencies += "org.occurrent" % "catchup-subscription" % "0.19.8"
@Grab(group='org.occurrent', module='catchup-subscription', version='0.19.8') 
[org.occurrent/catchup-subscription "0.19.8"]
'org.occurrent:catchup-subscription:jar:0.19.8'
<dependency org="org.occurrent" name="catchup-subscription" rev="0.19.8" />

For example:

// Create the subscription position storage. Note that if PositionAwareBlockingSubscription
// is also storing the position, it's highly recommended to share the same BlockingSubscriptionPositionStorage instance.     
SubscriptionPositionStorage storage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
PositionAwareSubscriptionModel continuousSubscriptionModel = ...
// Instantiate an event store that implements the EventStoreQueries API
EventStoreQueries eventStoreQueries = ... 


// Now combine the continuous subscription model and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting 
// from the beginning if the application where to crash during the catch-up phase.   
CatchupSubscriptionModel catchupSubscriptionModelModel = new CatchupSubscriptionModel(continuousSubscriptionModel, eventStoreQueries, 
            new CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage)
                    .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3));

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscriptionModelModel.subscribe("mySubscription", filter(type("GameEnded")), StartAtTime.beginningOfTime(), cloudEvent -> System.out.println(cloudEvent));

// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at 
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSubscriptionModel later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscriptionModelModel.subscribe("mySubscription", filter(type("GameEnded")), cloudEvent -> System.out.println(cloudEvent));

// is also storing the position, it's highly recommended to share the same SubscriptionPositionStorage instance.     
val storage : SubscriptionPositionStorage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
val continuousSubscription : PositionAwareSubscriptionModel= ...
// Instantiate an event store that implements the EventStoreQueries API
val eventStoreQueries : EventStoreQueries = ... 


// Now combine the continuous subscription model and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting 
// from the beginning if the application where to crash during the catch-up phase. If you don't
// use "andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents" but just "useSubscriptionPositionStorage(storage)"
// then the CatchupSubscriptionModel will still start from the position in the storage, but not write to it.
// The continuous subscription (passed as first parameter to CatchupSubscriptionModel) might write to the store, 
// which means that once the CatchupSubscriptionModel has caught up and the continuous subscription starts
// writing the position, the CatchupSubscriptionModel will just delegate to continuous subscription if it finds
// a position in the storage.           
val catchupSubscriptionModel = CatchupSubscriptionModel(continuousSubscription, eventStoreQueries, 
            CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage)
                    .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3))

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscriptionModel.subscribe("mySubscription", filter(type("GameEnded")), StartAt.beginningOfTime()) { cloudEvent -> 
    println(cloudEvent)
}

// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at 
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSubscriptionModel later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscriptionModel.subscribe("mySubscription", filter(type("GameEnded"))) { cloudEvent -> 
    println(cloudEvent)
}

To reduce the likelihood of duplicate events when switching from replay mode to continuous mode, a CatchupSubscriptionModel maintains an in-memory cache of event ids. The size of this cache is configurable using a CatchupSubscriptionModelConfig but it defaults to 100. Otherwise, there would be a chance that event written exactly when the switch from replay mode to continuous mode takes places, can be lost. To prevent this, the continuous mode subscription starts at a position before the last event read from the history. The purpose of the cache is thus to filter away events that are detected as duplicates during the switch. If the cache is too small, duplicate events will be sent to the continuous subscription. Typically, you want your application to be idempotent anyways and if so this shouldn’t be a problem.

A CatchupSubscriptionModel can be configured to store the subscription position in the supplied storage (see example above) so that, if the application crashes during replay mode, it doesn’t need to start replaying from the beginning again. Note that if you don’t want subscription persistence during replay, you can disable this by doing new CatchupSubscriptionModelConfig(dontUseSubscriptionPositionStorage()).

It’s also possible to change how the CatchupSubscriptionModel sorts events read from the event store during catch-up phase. For example:

var subscriptionModel = ...
var eventStore = ..
var cfg = new CatchupSubscriptionModelConfig(100).catchupPhaseSortBy(SortBy.descending(TIME));
var catchupSubscriptionModel = CatchupSubscriptionModel(subscriptionModel, eventStore, cfg);  

By default, events are sorted by time and then stream version (if two or more events have the same time).

Catch-up Subscription Usage

The subscription model will only stream historic events if started with a StartAt instance with a so called TimeBasedSubscriptionPosition, for example:

subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()), e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime())) { e -> 
    println("Event: $e")
}

If you don’t specify a StartAt position (or specify StartAt.subscriptionModelDefault() explicitly), the CatchupSubscriptionModel will just delegate to the parent subscription model and replay of old events will not happen. This means that for a subscription you can start it off by e.g. replaying from beginning of time then change the code and remove the StartAt position. It’ll then resume from the position of the last consumed event.

There are also some “shortcuts” to make it a bit more concise to start replay from beginning of time:

var subscriptionModel = new CatchupSubscriptionModel(..);
// All examples below are equivalent:
subscriptionModel.subscribeFromBeginningOfTime("subscriptionId", e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAtTime.beginningOfTime(), e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()), e -> System.out.println("Event: " + e);
val subscriptionModel = new CatchupSubscriptionModel(..);
// All examples below are equivalent:
subscriptionModel.subscribeFromBeginningOfTime("subscriptionId") { e -> println("Event: $") }
subscriptionModel.subscribe("subscriptionId", StartAtTime.beginningOfTime()) { e -> println("Event: $e") }
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()) { e -> println("Event: $") }
// beginningOfTime is an extension function imported from org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelExtensions.kt  
subscriptionModel.subscribe("subscriptionId", StartAt.beginningOfTime()) { e -> println("Event: $") }

It’s also possible to start from a specific java.time.OffsetDateTime, for example:

var offsetDateTime = OffsetDateTime.of(2024, 2, 3, 10, 4, 2, 0, ZoneOffset.UTC)
subscriptionModel.subscribe("subscriptionId", StartAtTime.offsetDateTime(offsetDateTime)) { e -> println("Event: $e") }
val offsetDateTime = OffsetDateTime.of(2024, 2, 3, 10, 4, 2, 0, ZoneOffset.UTC)
subscriptionModel.subscribe("subscriptionId", StartAt.offsetDateTime(offsetDateTime)) { e -> println("Event: $e") }

Competing Consumer Subscription (Blocking)

A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it, the others will be in standby. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a org.occurrent.subscription.api.blocking.CompetingConsumerStrategy to support different algorithms. You can write custom algorithms by implementing this interface yourself. But to use it, first depend on the CompetingConsumerSubscriptionModel:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>competing-consumer-subscription</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:competing-consumer-subscription:0.19.8'
libraryDependencies += "org.occurrent" % "competing-consumer-subscription" % "0.19.8"
@Grab(group='org.occurrent', module='competing-consumer-subscription', version='0.19.8') 
[org.occurrent/competing-consumer-subscription "0.19.8"]
'org.occurrent:competing-consumer-subscription:jar:0.19.8'
<dependency org="org.occurrent" name="competing-consumer-subscription" rev="0.19.8" />

A CompetingConsumerSubscriptionModel takes a CompetingConsumerStrategy as second parameter. There are currently two different implementations, both are based on MongoDB. Use the following if you’re using the native Java MongoDB driver (i.e. you’re not using Spring):

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking-competing-consumer-strategy</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-competing-consumer-strategy" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-competing-consumer-strategy', version='0.19.8') 
[org.occurrent/subscription-mongodb-native-blocking-competing-consumer-strategy "0.19.8"]
'org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-competing-consumer-strategy" rev="0.19.8" />

The CompetingConsumerStrategy implementation in this module is called NativeMongoLeaseCompetingConsumerStrategy. If you’re using Spring, depend on this module instead:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking-competing-consumer-strategy</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-competing-consumer-strategy:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-competing-consumer-strategy" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-competing-consumer-strategy', version='0.19.8') 
[org.occurrent/subscription-mongodb-spring-blocking-competing-consumer-strategy "0.19.8"]
'org.occurrent:subscription-mongodb-spring-blocking-competing-consumer-strategy:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-competing-consumer-strategy" rev="0.19.8" />

The CompetingConsumerStrategy implementation in this module is called SpringMongoLeaseCompetingConsumerStrategy and it’s using the MongoTemplate from the Spring ecosystem. Both of these strategies are heavily inspired by the awesome work of Alec Henninger. To understand how these strategies work under the hood, refer to his blog post.

Just like several other subscription models, the CompetingConsumerSubscriptionModel wraps another subscription model and decorates it with additional functionality, in this case to add competing consumer support to it. Below is an example that uses NativeMongoLeaseCompetingConsumerStrategy from module org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy with a DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.

MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database");
SubscriptionPositionStorage positionStorage = new NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage");
SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage);
 
// Create the CompetingConsumerSubscriptionModel
NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase);
CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy);
 // Now subscribe!
competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
val mongoDatabase = mongoClient.getDatabase("some-database")
val positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage")
val wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage)
 
// Create the CompetingConsumerSubscriptionModel
val competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase)
val competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy)
 // Now subscribe!
competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"))

If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.

Note that you can make several tweaks to the CompetingConsumerStrategy using the Builder, (new NativeMongoLeaseCompetingConsumerStrategy.Builder() or new SpringMongoLeaseCompetingConsumerStrategy.Builder()). You can, for example, tweak how long the lease time should be for the lock (default is 20 seconds), the name of lease collection in MongoDB, as well as the retry strategy and other things.

Subscription Life-cycle & Testing (Blocking)

Subscription models may also implement the SubscriptionLifeCycle interface (currently all blocking subscription models implements this). These subscription models supports canceling, pausing and resuming individual subscriptions. You can also stop an entire subscription model temporarily (stop) and restart it later (start).

Note the difference between canceling and pausing a subscription. Canceling a subscription will remove it and it’s not possible to resume it again later. Pausing a subscription will temporarily pause the subscription, but it can later be resumed using the resumeSubscription method.

Many of the methods in the SubscriptionLifeCycle are good to have when you write integration tests. It’s often useful to e.g. write events to the event store without triggering all subscriptions listening to the events. The life cycle methods allows you to selectively start/stop individual subscriptions so that you can (integration) test them in isolation.

Reactive Subscriptions

A “reactive subscription” is a subscription that uses non-blocking IO when reading events from the event store, i.e. reading changes from an EventStore will not block a thread. It uses concepts from reactive programming which is well-suited for working with streams of data. This is arguably a bit more complex for the typical Java developer, and you should consider using blocking subscriptions if high throughput, low CPU and memory-consumption is not critical.

To create a reactive subscription you first need to choose which “subscription model” to use. Then you create a subscription instance from this subscription model. All reactive subscriptions implements the org.occurrent.subscription.api.reactor.SubscriptionModel interface which uses components from project reactor. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscriptionModel.subscribe("mySubscriptionId").doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId").doOnNext(::println).subscribe()
The "subscribe" method returns an instance of "Flux<CloudEvent>".

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface SubscriptionModel {

    /**
     * Stream events from the event store as they arrive. Use this method if want to start streaming from a specific position.
     *
     * @return A Flux with cloud events which may also includes the SubscriptionPosition that can be used to resume the stream from the current position.
     */
    Flux<CloudEvent> subscribe(SubscriptionFilter filter, StartAt startAt);

    // Default methods 

}

It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a SubscriptionModel implementation that also implement the org.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel interface. The PositionAwareSubscriptionModel is an example of a SubscriptionModel that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.PositionAwareCloudEvent which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent) and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent). Note that PositionAwareCloudEvent is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareSubscriptionModel, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Reactive Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded"))).doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")).doOnNext(::println).subscribe()

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(::println).subscribe()

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Reactive Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides several ways to specify the start position, either by using StartAt.now(), StartAt.subscriptionModelDefault() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Reactive Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a reactive subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.reactor.SubscriptionPositionStorage acts as a uniform abstraction for this purpose. A ReactorSubscriptionPositionStorage is defined like this:

public interface ReactorSubscriptionPositionStorage {
    Mono<SubscriptionPosition> read(String subscriptionId);
    Mono<SubscriptionPosition> save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    Mono<Void> delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships one pre-defined reactive implementation (please contribute!):

1. ReactorSubscriptionPositionStorage
Uses the project reactor driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor-position-storage</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor-position-storage:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor-position-storage" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor-position-storage', version='0.19.8') 
[org.occurrent/subscription-mongodb-spring-reactor-position-storage "0.19.8"]
'org.occurrent:subscription-mongodb-spring-reactor-position-storage:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor-position-storage" rev="0.19.8" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “reactive subscription API” which contains the ReactorSubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-reactor</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-api-reactor:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-api-reactor" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-api-reactor', version='0.19.8') 
[org.occurrent/subscription-api-reactor "0.19.8"]
'org.occurrent:subscription-api-reactor:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-api-reactor" rev="0.19.8" />

Reactive Subscription Implementations

These are the non-durable reactive subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel interface (since these types of subscriptions doesn’t need to be aware of the position). However, you can do this anyway you like.

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.reactor.SubscriptionPositionStorage. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareSubscriptionModel and a ReactorSubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a subscription position storage implementation, and store the position yourself when you find fit.

Reactive Subscription using Spring ReactiveMongoTemplate

An implementation that uses Spring’s ReactiveMongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor', version='0.19.8') 
[org.occurrent/subscription-mongodb-spring-reactor "0.19.8"]
'org.occurrent:subscription-mongodb-spring-reactor:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor" rev="0.19.8" />

Then create a new instance of ReactorMongoSubscriptionModel and start subscribing:

ReactiveMongoTemplate reactiveMongoTemplate = ...
// Create the blocking subscription
SubscriptionModel subscriptionModel = new ReactorMongoSubscriptionModel(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptionModel.subscribe("mySubscriptionId").flatMap(cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val reactiveMongoTemplate : ReactiveMongoTemplate = ... 
// Create the blocking subscription
val subscriptionModel = ReactorMongoSubscriptionModel(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId").flatMap { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }.subscribe()

// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
ReactorMongoSubscriptionModel can be imported from the "org.occurrent.subscription.mongodb.spring.reactor" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the ReactiveMongoTemplate instance.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Durable Subscriptions (Reactive)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that combines a PositionAwareSubscriptionModel and a ReactorSubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to ReactorDurableSubscriptionModelConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new ReactorDurableSubscriptionModelConfig(3) that creates an instance of EveryN that stores the position for every third event.

To use it, first to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>reactor-durable-subscription</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:reactor-durable-subscription:0.19.8'
libraryDependencies += "org.occurrent" % "reactor-durable-subscription" % "0.19.8"
@Grab(group='org.occurrent', module='reactor-durable-subscription', version='0.19.8') 
[org.occurrent/reactor-durable-subscription "0.19.8"]
'org.occurrent:reactor-durable-subscription:jar:0.19.8'
<dependency org="org.occurrent" name="reactor-durable-subscription" rev="0.19.8" />

Then we should instantiate a PositionAwareSubscriptionModel, that subscribes to the events from the event store, and an instance of a ReactorSubscriptionPositionStorage, that stores the subscription position, and combine them to a ReactorDurableSubscriptionModel:

// Create the non-durable blocking subscription instance 
PositionAwareReactorSubscription nonDurableSubscription = ...
// Create the storage
ReactorSubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription and the subscription position storage
ReactorSubscriptionWithAutomaticPositionPersistence durableSubscription = new ReactorSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);

// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscription : PositionAwareReactorSubscription = ...
// Create the storage
val storage : ReactorSubscriptionPositionStorage = ...

// Now combine the non-durable subscription and the subscription position storage
val durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)

// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent -> 
    doSomethingWithTheCloudEvent(cloudEvent) 
}.subscribe()

Decider

As of version 0.17.0, Occurrent has basic support for Deciders. A decider is a model that can be used as a structured way of implementing decision logic for a business entity (typically aggregate) or use case/command. Some benefits of using deciders are:

  1. You don’t need to implement any folding of events to state yourself.
  2. You get a good structure for defining your aggregate/use case.
  3. A decider can return either the new events, the new state, or both events and state (called Decision in Occurrent), for a specific command.
  4. Occurrent’s decider implementation supports sending multiple commands to a decider atomically.
  5. Deciders are also combinable, however this feature is not yet available in Occcurrent.

To use a decider, you need to model your commands as explicit data structures instead of functions.

To create a decider, first include the dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>decider</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:decider:0.19.8'
libraryDependencies += "org.occurrent" % "decider" % "0.19.8"
@Grab(group='org.occurrent', module='decider', version='0.19.8') 
[org.occurrent/decider "0.19.8"]
'org.occurrent:decider:jar:0.19.8'
<dependency org="org.occurrent" name="decider" rev="0.19.8" />

You can then either implement the org.occurrent.dsl.decider.Decider interface or use the default implementation (see more below). The interface is defined like this:

public interface Decider<C, S, E> {
    S initialState();

    @NotNull
    List<E> decide(@NotNull C command, S state);

    S evolve(S state, @NotNull E event);

    default boolean isTerminal(S state) {
        return false;
    }
}

where:

Parameter Type Description
C The type of the commands that the decider can handle
S The state that the decider works with
E The type of events that the decider returns

The interface contains four methods:

Method name Description
initialState Returns the initial state of the decider, for example null or something like “NotStarted” (a domain specific state implemented by you), depending on your domain
decide A function that takes a command and the current state and returns a list of new events that represents the changes the occurred after the commands was handled
evolve A method that takes the current state and an event, and return an update state after applying this event
isTerminal An optional method that can be implemented/overridden to tell the Decider to stop evolving the state if the Decider has reached a specific state

It’s highly recommended to read this blog post to get a better understanding of the rationale behind Deciders.

But you don’t actually need to implement this interface yourself, instead you can create a default implementation by passing in functions to Decider.create(..).

Imagine that you have commands, events and state defined like this:

sealed interface Command {
    record Command1(String something, String message) implements Command {
    }
    
    record Command2(String message) implements Command {
    }
}

sealed interface Event {
   record Event1(String something) implements Command {
   }
   
   record Event2(String somethingElse) implements Command {
   }
   
   record Event3(String message) implements Command {
   }
}

record State(String something, String somethingElse, String message) {
    // Other constructors excluded for brevity   
}
sealed interface Command {
    data class Command1(val something : String, val message : String) : Command
    data class Command2(val message : String) : Command
}

sealed interface Event {
   data class Event1(val something : String)
   data class Event2(val somethingElse : String)
   data class Event3(val message : String)
}

data class State(val something : String, val somethingElse : String, val message : String) {
    // Other constructors excluded for brevity   
}

Then you can create a decider like this:

// This example uses Java 21+
var decider = Decider.<Command, State, Event>create(
        null,
        (command, state) -> switch (command) {
            case Command1 c1 -> {
                if (s == null) {
                    yield List.of(new Event1(c1.something()));
                } else {
                    yield List.of(new Event3(c1.message()));
                }
            }
           case Command2 c2 -> List.of(new MyEvent2(c2.somethingElse()));
        },
        (state, event) -> switch (event) {
            case Event1 e1 -> new State(e1.something());    
            case Event2 e2 -> new State(s.something(), e2.message());    
            case Event3 e3 -> new State(s.something(), e3.somethingElse(), s.message());    
        }
);

// You can pass an optional Predicate as a fourth argument to Decider.create(..) if you like to specify the "isTerminal" condition, otherwise it always returns false by default.
// Importing this extension function makes creating deciders nicer from Kotlin
import org.occurrent.dsl.decider.decider 

val decider = decider<Command, State?, Event>(
        initialState  = null,
        decide = { cmd, state -> 
            when (cmd) {
              is Command1 -> listOf(if (cmd == null) Event1(c1.something()) else Event3(c1.message()))
              is Command2 -> listOf(MyEvent2(c2.somethingElse()))
           }
        },
        evolve = { _, e ->
            when (e) {
                is Event1 -> State(e1.something())
                is Event2 -> State(s.something(), e2.message())
                is Event3 -> State(s.something(), e3.somethingElse(), s.message())
            }
        }
)

// You can also, optionally, define an "isTerminal" predicate as a fourth argument to the decider(..) function if you need to specify this condition, otherwise it always returns false by default.

Now that you have an instance of Decider, you can then call any of the many default methods to return either the name state, the new events, or both. For example:

// If you're using an event store to store the events, you can do like this: 

List<Event> currentEvents = ...
Command command = ..

List<Event> newEvents = decider.decideOnEventsAndReturnEvents(currentEvents, command);
State newState = decider.decideOnEventsAndReturnState(currentEvents, command);
// Return both the state and the new events
Decision<State, List<Event>> decision = decider.decideOnEvents(currentEvents, command);

// Or if you store state instead of events:

State currentState = ...
Command command = ..

List<Event> newEvents = decider.decideOnStateAndReturnEvents(currentState, command);
State newState = decider.decideOnStateAndReturnState(currentState, command);
// Return both the state and the new events
Decision<State, List<Event>> decision = decider.decideOnState(currentState, command);

// You can even apply multiple commands at the same time:

List<Event> currentEvents = ...
Command command1 = ..
Command command2 = ..

// Both commands will be applied atomically
List<Event> newEvents = decider.decideOnEventsAndReturnEvents(currentEvents, command1, command2);
// Import the Kotlin extension functions
import org.occurrent.dsl.decider.decide
import org.occurrent.dsl.decider.component1
import org.occurrent.dsl.decider.component2

val currentEvents : List<Event> = ...
val currentState : State? = ...
val command : Command = ...

// We use destructuring here to get the "newState" and "newEvents" from the Decision instance returned by decide
// This is the reason for importing the "component1" and "component2" extension functions above 
val (newState, newEvents) = decider.decide(events = currentEvents, command = command)

// You can also start the computation based on the current state
val (newState, newEvents) = decider.decide(state = currentState, command = command)

// And you could of course also just use the actual "Decision" if you like
val decision : Decision<State, List<Event>> = decider.decide(events = currentEvents, command = command)

// You can also supply multiple commands at the same time, then all of them will succeed or fail atomically
val (newState, newEvents) = decider.decide(currentState, command1, command2)

// You can also use the Java <code>decide</code> methods, such as <code>decideOnStateAndReturnEvent</code>, from Kotlin, but usually it's enough to just use the <code>org.occurrent.dsl.decider.decide</code> extension function.

Using an ApplicationService with Decider’s

It’s possible to integrate Decider’s with an ApplicationService to easily load existing events from an event store.

Java

To use the existing ApplicationService infrastructure with Deciders from Java, you can do like this:

ApplicationService<Event> applicationService = ...
Command command = ...

// Because the decider expects a List<Event>, and not Stream<Event> as expected by the ApplicationService,
// we first convert the Stream to a List using the "toStreamCommand" function provided by Occurrent.
var writeResult = applicationService.execute("streamId", toStreamCommand(events -> decider.decideOnEventsAndReturnEvents(events, defineName)));
toStreamCommand can be statically imported from org.occurrent.application.composition.command.toStreamCommand.

Kotlin

The org.occurrent:decider module contains Kotlin extension functions, located in the org.occurrent.dsl.decider.ApplicationServiceDeciderExtensions.kt file, that allows you to easily integrate deciders with existing ApplicationService infrastructure. Here’s an example:

import org.occurrent.dsl.decider.execute

// Create the application service and decider
val applicationService = ...
val decider = ... 

// Then you can pass the decider and command to the application service instance 
val writeResult = applicationService.execute(streamId, command, decider)

It’s also possible to return the decision, state or new events when calling execute:

import org.occurrent.dsl.decider.executeAndReturnDecision
import org.occurrent.dsl.decider.executeAndReturnState
import org.occurrent.dsl.decider.executeAndReturnEvents

// Invoke the decider with the command and return both state and new events (decision) 
val decision = applicationService.executeAndReturnDecision(streamId, command, decider)
// Invoke the decider with the command and return the new state
val state = applicationService.executeAndReturnState(streamId, command, decider)
// Invoke the decider with the command and return the new events
val newEvents = applicationService.executeAndReturnEvents(streamId, command, decider)

Retry

Retry Configuration (Blocking)

Occurrent contains a retry module that you can depend on using:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>retry</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:retry:0.19.8'
libraryDependencies += "org.occurrent" % "retry" % "0.19.8"
@Grab(group='org.occurrent', module='retry', version='0.19.8') 
[org.occurrent/retry "0.19.8"]
'org.occurrent:retry:jar:0.19.8'
<dependency org="org.occurrent" name="retry" rev="0.19.8" />
Typically you don't need to depend on this module explicitly since many of Occurrent's components already uses this library under the hood and is thus depended on transitively.

Occurrent components that support retry (subscription model and subscription position storage implementations) typically accepts an instance of org.occurrent.retry.RetryStrategy to their constructors. This allows you to configure how they should do retry. You can configure max attempts, a retry predicate, error listener, before/after retry listener, as well as the backoff strategy. Here’s an example:

RetryStrategy retryStrategy = RetryStrategy
                                    .exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(2), 2.0)
                                    .retryIf(UncategorizedSQLException.class::isInstance)
                                    .maxAttempts(5)
                                    .onBeforeRetry(throwable -> log.warn("Caught exception {}, will retry.", throwable.getClass().getSimpleName()))
                                    .onError((info, throwable) -> if(info.isLastAttempt()) log.error("Ended with exception {}.", throwable.getClass().getSimpleName()));

You can then use a RetryStrategy instance to call methods that you want to be retried on exception by using the execute method:

RetryStrategy retryStrategy = ..
// Retry the method if it throws an exception
retryStrategy.execute(Something::somethingThing);

RetryStrategy is immutable, which means that you can safely do things like this:

RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5);
// Uses default 200 ms fixed delay
retryStrategy.execute(() -> Something.something());
// Use 600 ms fixed delay
retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse());
// 200 ms fixed delay again
retryStrategy.execute(() -> Thing.thing());

You can also disable retries by calling RetryStartegy.none().

As of version 0.11.0 you can also use the mapRetryPredicate function easily allows you to map the current retry predicate into a new one. This is useful if you e.g. want to add a predicate to the existing predicate. For example:

// Let's say you have a retry strategy:
Retry retry = RetryStrategy.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(2), 2.0f).maxAttempts(5).retryIf(WriteConditionNotFulfilledException.class::isInstance);
// Now you also want to retry if an IllegalArgumentException is thrown:
retry.mapRetryPredicate(currentRetryPredicate -> currentRetryPredicate.or(IllegalArgument.class::isInstance))

As of version 0.16.3, RetryStrategy now accepts a function that takes an instance of org.occurrent.retry.RetryInfo. This is useful if you need to know the current state of your of the retry while retrying. For example:

RetryStrategy retryStrategy = RetryStrategy
                              .exponentialBackoff(initialDelay, maxDelay, 2.0)
                              .maxAttempts(10)
retryStrategy.execute(info -> {
    if (info.getNumberOfAttempts() > 2 &&  info.getNumberOfAttempts() < 6) {
        System.out.println("Number of attempts is between 3 and 5");
    }
    ...     
});

DSL’s

Subscription DSL

The subscription DSL is a utility that you can use to easier create subscriptions by using a CloudEventConverter. There’s a both a Kotlin DSL and Java DSL. First you need to depend on the subscription-dsl module:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-dsl-blocking</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:subscription-dsl-blocking:0.19.8'
libraryDependencies += "org.occurrent" % "subscription-dsl-blocking" % "0.19.8"
@Grab(group='org.occurrent', module='subscription-dsl-blocking', version='0.19.8') 
[org.occurrent/subscription-dsl-blocking "0.19.8"]
'org.occurrent:subscription-dsl-blocking:jar:0.19.8'
<dependency org="org.occurrent" name="subscription-dsl-blocking" rev="0.19.8" />

If you’re using Kotlin you can then define subscriptions like this:

val subscriptionModel = SpringMongoSubscriptionModel(..)
val cloudEventConverter = GenericCloudEventConverter<DomainEvent>(..)

subscriptions(subscriptionModel, cloudEventConverter) {
    subscribe<GameStarted>("id1") { gameStarted ->
        log.info("Game was started $gameStarted")
    }
    subscribe<GameWon, GameLost>("id2") { domainEvent ->
        log.info("Game was either won or lost: $domainEvent")
    }
    subscribe("everything") { domainEvent ->
        log.info("I subscribe to every event: $domainEvent")
    }
} 

Note that as of version 0.6.0 you can also do:

subscribe<GameStarted> { gameStarted ->
    log.info("Game was started $gameStarted")
}

i.e. you don’t need to specify an id explicitly. Be careful here though, since the name of the subscription will be generated from the event name (the unqualified name, in this case the subscription id would be “GameStarted”). This can lead to trouble if you rename your event because then the id of your subscription will change as well, and it won’t continue from the previous position in the subscription position storage.

If using Java you can do:

SpringMongoSubscriptionModel subscriptionModel = SpringMongoSubscriptionModel(..);
GenericCloudEventConverter cloudEventConverter = GenericCloudEventConverter<DomainEvent>(..);

Subscriptions<DomainEvent> subscriptions = new Subscriptions<DomainEvent>(subscriptionModel, cloudEventConverter); 
        
subscriptions.subscribe("gameStarted", GameStarted.class, gameStarted -> {
    log.info("Game was started {}", gameStarted)
});

For this to work, your domain events must all “implement” a DomainEvent interface (or a sealed class in Kotlin). Note that DomainEvent is something you create yourself, it’s not something that is provided by Occurrent.

As of version 0.17.0 you can also get metadata (such as stream version, stream id and all other cloud event extension properties) when consuming an event:

subscriptions.subscribe("gameStarted", GameStarted.class, (metadata, gameStarted) -> {
    long streamVersion = metadata.getStreamVersion();
    String streamId = metadata.getStreamId();
    Map<String, Object> allData = metadata.getData();
    var custom = allData.get("extensionPropertyDefinedInCloudEvent");
    
    // Do stuff
});

subscribe<GameStarted> { metadata, event ->
    val streamVersion = metadata.streamVersion
    val streamId = metadata.streamId
    val allData = metadata.data
    val custom = allData["extensionPropertyDefinedInCloudEvent"]
    
    // Do stuff
}

Query DSL

The “Query DSL” (or “domain query DSL”) is a small wrapper around the EventStoreQueries API that lets you query for domain events instead of CloudEvents. Depend on the org.occurrent:query-dsl-blocking module and create an instance of org.occurrent.dsl.query.blocking.DomainEventQueries. For example:

EventStoreQueries eventStoreQueries = .. 
CloudEventConverter<DomainEvent> cloudEventConverter = ..
DomainEventQueries<DomainEvent> domainEventQueries = new DomainEventQueries<DomainEvent>(eventStoreQueries, cloudEventConverter);
 
Stream<DomainEvent> events = domainQueries.query(Filter.subject("someSubject"));

There’s also support for skip, limits and sorting and convenience methods for querying for a single event:

Stream<DomainEvent> events = domainQueries.query(GameStarted.class, GameEnded.class); // Find only events of this type
GameStarted event1 = domainQueries.queryOne(GameStarted.class); // Find the first event of this type
GamePlayed event2 = domainQueries.queryOne(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")); // Find event with this id

There are also some Kotlin extensions that you can use to query for a Sequence of events instead of a Stream:

 val events : Sequence<DomainEvent> = domainQueries.queryForSequence(GamePlayed::class, GameWon::class, skip = 2) // Find only events of this type and skip the first two events
 val event1 = domainQueries.queryOne<GameStarted>() // Find the first event of this type
 val event2 = domainQueries.queryOne<GamePlayed>(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")) // Find event with this id

Spring Boot Starter

Use the “Spring Boot Starter” project to bootstrap Occurrent quickly if using Spring. Add the following to your build script:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>spring-boot-starter-mongodb</artifactId>
    <version>0.19.8</version>
</dependency>
compile 'org.occurrent:spring-boot-starter-mongodb:0.19.8'
libraryDependencies += "org.occurrent" % "spring-boot-starter-mongodb" % "0.19.8"
@Grab(group='org.occurrent', module='spring-boot-starter-mongodb', version='0.19.8') 
[org.occurrent/spring-boot-starter-mongodb "0.19.8"]
'org.occurrent:spring-boot-starter-mongodb:jar:0.19.8'
<dependency org="org.occurrent" name="spring-boot-starter-mongodb" rev="0.19.8" />

Next create a Spring Boot application annotated with @SpringBootApplication as you would normally do, and also add the @EnableOccurrent annotation (located in package org.occurrent.springboot.mongo.blocking). Occurrent will then configure the following components automatically:

You can of course override other beans as well to tailor them to your needs. See the source code of org.occurrent.springboot.mongo.blocking.OccurrentMongoAutoConfiguration if you want to know exactly what gets configured automatically.

It’s also possible to configure certain aspects from the application.yaml file under the “occurrent” namespace. For example:

occurrent:
  application-service:
    enable-default-retry-strategy: false

You can code-complete the available properties in Intellij or have a look at org.occurrent.springboot.mongo.blocking.OccurrentProperties to find which configuration properties that are supported.

Spring Boot Annotations

If using the Spring Boot Starter you have the option to start subscriptions using the @Subscription annotation (org.occurrent.annotation.Subscription). For example if you have a domain event like this:

public sealed interface DomainEvent permits DomainEvent1, DomainEvent2, DomainEvent3 {
    UUID eventId();
    ZonedDateTime timestamp();
}

public record DomainEvent1(UUID eventId, ZonedDateTime timestamp, String someData1) implements DomainEvent {
}

public record DomainEvent2(UUID eventId, ZonedDateTime timestamp, String someData2) implements DomainEvent {
}

public record DomainEvent3(UUID eventId, ZonedDateTime timestamp, String someData3) implements DomainEvent {
}
sealed interface DomainEvent {
    val eventId: UUID
    val timestamp: ZonedDateTime
}

data class DomainEvent1( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData1: String ) : DomainEvent
data class DomainEvent2( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData2: String ) : DomainEvent
data class DomainEvent3( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData3: String ) : DomainEvent

you can create a subscription to all events like this:

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents")
    void printAllDomainEvents(DomainEvent e) {
        System.out.println("Received domain event %s".formatted(e));    
    }
}
@Component
class Example {

    @Subscription(id = "printAllDomainEvents")
    fun printAllDomainEvents(e: DomainEvent) {
        println("Received domain event $e")
    }
}

Note that subscriptions started by the Subscription annotation will make use of competing consumers so that if you run multiple instances of the application one of them will receive the event(s).

Subscription Start Position

When creating a subscription using the @Subscription annotation you can specify how it should behave when first starting (creating) the subscription as well as how it should be resumed when the application is restarted. Here’s an example:

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME)
    void printAllDomainEvents(DomainEvent e) {
        System.out.println("Received domain event %s".formatted(e));    
    }
}
@Component
class Example {

    @Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME)
    fun printAllDomainEvents(e: DomainEvent) {
        println("Received domain event $e")
    }
}

This will first replay all historic events from the beginning of time and then continue subscribing to new events continuously. You can also start at a specific date by using startAtISO8601() or startAtTimeEpochMillis().

Note that the example above will start replaying historic events from the beginning of time when the subscription is started the first time. However, once the subscription is resumed, e.g. on application restart, it’ll continue from the last received event.

Here’s a description for each StartPosition:

StartPosition Description
BEGINNING_OF_TIME Start this subscription from the first event in the event store.
NOW Start this subscription from current time.
DEFAULT Start this subscription using the default behavior of the subscription model. Typically, this means that it’ll start from NOW, unless the subscription has already been started before, in which case the subscription will be started from its last known position.

If you want a different behavior when the application is restarted, configure a different resumeBehavior() (@Subscription(id="mySubscription", resumeBehavior=..)):

Resume behavior:

ResumeBehavior Description
DEFAULT Use the default resume behavior of the underlying subscription model. For example, if the StartPosition is set to StartPosition.BEGINNING_OF_TIME, and ResumeBehavior is set to ResumeBehavior.DEFAULT, then the subscription will start from the beginning of time the first time it’s run. Then, on application restart, it’ll continue from the last received event (the subscription position (checkpoint) for the subscription) on restart.
SAME_AS_START_AT Always start at the same position as specified by the StartPosition. I.e., even if there’s a subscription position (checkpoint) stored for the subscription, it’ll be ignored on application restart and the subscription will resume from the specified StartPosition.

The combination of start and resume behavior can enable various use cases. For example:

StartPostion ResumeBehavior Use Case
BEGINNING_OF_TIME DEFAULT Start a new subscription from “beginning of time”, but when the app is restarted, continue from the last processed event (i.e. don’t replay all historic events again).
BEGINNING_OF_TIME SAME_AS_START_AT The subscription will always (even on restart) subscribe to all historic events, effectively making this an in-memory subscription. Note that this subscription will be called on each instance of the application even though competition consumer behavior is configured.
NOW SAME_AS_START_AT The subscription will always (even on restart) subscribe to events from “now”, ignoring the historic events. This will effectively make this an in-memory subscription. Note that this subscription will be called on each instance of the application event though competition consumer behavior is configured.
NOW DEFAULT The subscription start subscribing to events from “now” when created, but continue (resume) from the last received event on restart.
DEFAULT DEFAULT Same as above (this is the default behavior if start position and resume behavior is not specified).

Selective Events

If DomainEvent is a sealed interface/class (as in the examples above), then all events implementing this interface/class will be received when subscribing to this event. You can of course subscribe to an individual event, such as DomainEvent2. But if you want to receive only some of the events that implement the DomainEvent interface, you can use eventTypes(). For example, if you want to subscribe on both DomainEvent1 and DomainEvent3 but handle them as a DomainEvent:

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents", eventTypes = {DomainEvent1.class, DomainEvent3.class})
    void printSomeDomainEvents(DomainEvent e) {
        System.out.println("Received any of the specified domain events: %s".formatted(e));    
    }
}
@Component
class Example {
    
    @Subscription(id = "printAllDomainEvents", eventTypes = [DomainEvent1::class, DomainEvent3::class])
    fun printSomeDomainEvents(e: DomainEvent) {
        println("Received any of the specified domain events: $e")
    }
}

Event Metadata

Sometimes it can be useful to get the metadata associated with the received event. For this reason, you can add a parameter to the method annotated with @Subscription of type org.occurrent.dsl.subscription.blocking.EventMetadata. It’ll contain all extension properties added to the CloudEvent, for example:

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents")
    void printAllDomainEvents(DomainEvent e, EventMetadata metadata) {
        String streamId = metadata.getStreamId();
        long streamVersion = metadata.getStreamVersion();
        Object myCustomValue = metadata.get("MyCustomValue");
        ...
    }
}
@Component
class Example {
    
    @Subscription(id = "printAllDomainEvents")
    fun printAllDomainEvents(e: DomainEvent, metadata: EventMetadata) {
        val streamId: String = metadata.streamId
        val streamVersion: Long = metadata.streamVersion
        val myCustomValue: Any? = metadata["MyCustomValue"]
        // ...
    }
}

Subscription Startup Mode

You can configure whether the subscription should start before the application is ready to receive requests. For example, it might be very important that a certain subscription is started before the first web request comes in:

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents", startupMode = StartupMode.WAIT_UNTIL_STARTED)
    void printAllDomainEvents(DomainEvent e) {
        System.out.println("Received domain event %s".formatted(e));    
    }
}
@Component
class Example {

    @Subscription(id = "printAllDomainEvents", startupMode = StartupMode.WAIT_UNTIL_STARTED)
    fun printAllDomainEvents(e: DomainEvent) {
        println("Received domain event $e")
    }
}

In other cases, such as when replaying a huge number of historic events it might be better to let the application start and let the processing of historic events happen in the background.

@Component
public class Example {
    
    @Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME, startupMode = StartupMode.BACKGROUND)
    void printAllDomainEvents(DomainEvent e) {
        System.out.println("Received domain event %s".formatted(e));    
    }
}
@Component
class Example {

    @Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME, startupMode = StartupMode.BACKGROUND)
    fun printAllDomainEvents(e: DomainEvent) {
        println("Received domain event $e")
    }
}

Here’s a summary of the different startup modes:

StartupMode Description
DEFAULT Determine the startup mode based on the properties of the subscription (such as startAt() and resumeBehavior()). It’ll use BACKGROUND if the subscription needs to replay historic events before subscribing to new ones (e.g. if startAt() is StartPosition.BEGINNING_OF_TIME), otherwise WAIT_UNTIL_STARTED will be used.
WAIT_UNTIL_STARTED The subscription will wait until it’s started up fully before Spring continues starting the rest of the application. Most of the time this is recommended because otherwise there could be a small chance that a request is received by your application before the subscription has bootstrapped completely. This can lead to the subscription missing this event. This is only true if the subscription is brand new. As soon as the subscription has received an event that is stored in a org.occurrent.subscription.api.blocking.SubscriptionPositionStorage (checkpointing), it’ll never miss an event during startup.
BACKGROUND The subscription will NOT wait until it’s started up fully before Spring continues starting the rest of the application; instead, it will be started in the background. Typically, this is useful if you instruct the subscription to start at an earlier date (such as the beginning of time), and you have a lot of events to read before the subscription has caught up. In this case, you may wish to start the Spring application before the subscription has fully started (i.e., before all historic events have been replayed) because waiting for all events to replay takes too long. The subscription will then replay all historic events in the background before switching to continuous mode.

Blogs

Johan has created a couple of blog-posts on Occurrent on his blog:

  1. Occurrent - Event Sourcing for the JVM

Contact & Support

Would you like to contribute to Occurrent? That’s great! You should join the mailing-list or contribute on github. The mailing-list can also be used for support and discussions.

Credits

Thanks to Per Ökvist for discussions and ideas, and David Åse for letting me fork the awesome Javalin website. Credits should also go to Alec Henninger for his work on competing consumer support for MongoDB change streams.