Fork me on GitHub

Documentation

The documentation on this page is always for the latest version of Occurrent, currently 0.3.0.

If you like Occurrent, please consider starring us on GitHub: Like Occurrent? Star us on GitHub:

Introduction

Occurrent is in an early stage so API's, and even the data model, are subject to change in the future.

Occurrent is an event sourcing library, or if you wish, a set of event sourcing utilities for the JVM, created by Johan Haleby. There are many options for doing event sourcing in Java already so why build another one? There are a few reasons for this besides the intrinsic joy of doing something yourself:

Concepts

Event Sourcing

Every system needs to store and update data somehow. Many times this is done by storing the current state of an entity in the database. For example, you might have an entity called Order stored in a order table in a relational database. Everytime something happens to the order, the table is updated with the new information and replacing the previous values. Event Sourcing is a technique that instead stores the changes, represented by events, that occurred for the entity. Events are facts, things that have happened, and they should never be updated. This means that not only can you derive the current state from the set of historic events, but you also know which steps that were involved to reach the current state.

EventStore

The event store is a place where you store events. Events are immutable pieces of data describing state changes for a particular stream. A stream is a collection of events that are related, typically but not limited to, a particular entity. For example a stream may include all events for a particular instance of a game or an order.

Occurrent provides an interface, EventStore, that allows to read and write events from the database. The EventStore interface is actually composed of various smaller interfaces since not all databases supports all aspects provided by the EventStore interface. Here’s an example that writes a cloud event to the event store and read it back:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Note that when reading the events, the EventStore won’t simply return a Stream of CloudEvent’s, instead it returns a wrapper called EventStream.

EventStream

The EventStream contains the CloudEvent’s for a stream and the version of the stream. The version can be used to guarantee that only one thread/process is allowed to write to the stream at the same time, i.e. optimistic locking. This can be achieved by including the version in a write condition.

Note that reading a stream that doesn’t exist (e.g. eventStore.read("non-existing-id") will return an instance of EventStream with an empty stream of events and 0 as version number. The reason for this is that you can use the same “application service” (a fancy word for a piece of code that loads events from the event store, applies them to the domain model and writes the new events returned to the event store) for both entity creation and subsequent use cases. For example consider this simple domain model:

public class WordGuessingGame {
	public static Stream<CloudEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<CloudEvent> guessWord(Stream<CloudEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package 
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<CloudEvent>, word : String) : Stream<CloudEvent> = ...
 }

Then we could write a generic application service that takes a higher-order function (Stream<CloudEvent>) -> Stream<CloudEvent>:

public class ApplicationService {

    private final EventStore eventStore;

    public ApplicationService(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    public void execute(String streamId, Function<Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);

        // Invoke the domain model  
        Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventStream.events());

        // Persist the new events  
        eventStore.write(streamId, eventStream.version(), newEvents);
    }
}
class ApplicationService constructor (val eventStore : EventStore) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        
         // Invoke the domain model 
        val newEvents = functionThatCallsDomainModel(eventStream.events())

        // Persist the new events
        eventStore.write(streamId, eventStream.version(), newEvents)
    }
}
Note that typically the domain model, WordGuessingGame in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in this example above for brevity.

You could then call the application service like this regardless of you’re starting a new game or not:

// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
String gameId = ...
String wordToGuess = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId, __ -> WordGuessingGame.startNewGame(gameId, wordToGuess));  

// Later a player guess a word:
String gameId = ...
String guess = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));
// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
val gameId : String = ...
val wordToGuess : String = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId) { 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}  

// Later a player guess a word:
val gameId : String = ...
val guess : String = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));

Writing application services like this is both powerful and simple (once you get used to it). There’s less need for explicit commands and command handlers (the application service is a kind of command handler). You can also use other functional techniques such as partial application to make the code look, arguably, even nicer. It’s also easy to compose several calls to the domain model into one by using standard functional composition techniques. For example in this case you might consider both starting a game and let the player make her first guess from a single request to the REST API. No need to change the domain model to do this, just use function composition.

Write Condition

A “write condition” can be used to specify conditional writes to the event store. Typically, the purpose of this would be to achieve optimistic concurrent control (optimistic locking) of an event stream.

For example, image you have an Account to which you can deposit and withdraw money. A business rule says that it’s not allowed to have a negative balance on an account. Now imagine an account that is shared between two persons and contains 20 EUR. Person “A” wants to withdraw 15 EUR and person “B” wants to withdraw 10 EUR. If they try to do this, an error message should be presented to one of them since the account balance would be negative. But what happens if both persons try to withdraw the money at the same time? Let’s have a look:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw, and returns new events. 
// In this case, a "MoneyWasWithdrawn" event is returned,  since 15 EUR is OK to withdraw.     
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store  
eventStore.write("account1", events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // B

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events);
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw. It returns a stream of 
// new events, in this case only a "MoneyWasWithdrawn" event,  since 15 EUR is OK to withdraw.     
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR))

// We write the new events to the event store  
eventStore.write("account1", events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1") // B

// Again we want to withdraw money, and the system will think this is OK, 
// since the Account thinks that 10 EUR will have a balance of 10 EUR after 
// the withdrawal.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events)
Note that typically the domain model, Account in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in the example above for brevity, look at the command section for a more real-life example.

To avoid the problem above we want to make use of conditional writes. Let’s see how:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
long currentVersion = eventStream.version(); 

// Withdraw money
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A 
long currentVersion = eventStream.version();

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events); 
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A
val currentVersion = eventStream.version() 

// Withdraw money
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1"); // A 
val currentVersion = eventStream.version()

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events) 

What you’ve seen above is a simple, but widely used, form of write condition. Actually, doing eventStore.write("streamId", version, events) is just a shortcut for:

eventStore.write("streamId", WriteCondition.streamVersionEq(version), events);
eventStore.write("streamId", WriteCondition.streamVersionEq(version), events)
WriteCondition can be imported from "org.occurrent.eventstore.api.WriteCondition".

But you can compose a more advanced write condition using a Condition:

eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events);
eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events)

where lt, ne and and is statically imported from org.occurrent.condition.Condition.

EventStore Queries

Since Occurrent builds on-top of existing databases it’s ok, given that you know what you’re doing*, to use the strengths of these databases. One such strength is that databases typically have good querying support. Occurrent exposes this with the EventStoreQueries interface that an EventStore implementation may implement to expose querying capabilities. For example:

OffsetDateTime lastTwoHours = OffsetDateTime.now().minusHours(2); 
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
Stream<CloudEvent> events = eventStore.query(time(lte(lastTwoHours)).and(subject("123")), SortBy.TIME_DESC);
val lastTwoHours = OffsetDateTime.now().minusHours(2);
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
val events : Stream<CloudEvent> = eventStore.query(time(lte(lastTwoHours)).and(subject("123")), SortBy.TIME_DESC)
*There's a trade-off when it's appropriate to query the database vs creating materialized views/projections and you should most likely create indexes to allow for fast queries.

The time and subject methods are statically imported from org.occurrent.filter.Filter and lte is statically imported from org.occurrent.condition.Condition.

EventStoreQueries is not bound to a particular stream, rather you can query any stream (or multiple streams at the same time). It also provides the ability to get an “all” stream:

// Return all events in an event store sorted by descending order
Stream<CloudEvent> events = eventStore.all(SortBy.TIME_DESC);
// Return all events in an event store sorted by descending order
val events : Stream<CloudEvent> = eventStore.all(SortBy.TIME_DESC)

The EventStoreQueries interface also supports skip and limit capabilities which allows for pagination:

// Skip 42, limit 1024
Stream<CloudEvent> events = eventStore.all(42, 1024);
// Skip 42, limit 1024
val events : Stream<CloudEvent> = eventStore.all(42, 1024)

To get started with an event store refer to Choosing An EventStore.

Subscriptions

A subscription is a way to get notified when new events are written to an event store. Typically, a subscription will forward the event to another piece of infrastructure such as a message bus, or create views from the events (such as projections, sagas, snapshots etc). There are two different kinds of API’s, the first one is a blocking API represented by the BlockingSubscription interface (in the org.occurrent:subscription-api-blocking module), and second one is a reactive API represented by the ReactorSubscription interface (in the org.occurrent:subscription-api-reactor module).

The blocking API is callback based, which is fine if you’re working with individual events (you can of course write a simple function that aggregates events into batches). If you want to work with streams of data, the ReactorSubscription is probably a better option since it’s using the Flux publisher from project reactor.

Note that it’s fine to use ReactorSubscription, even though the event store is implemented using the blocking api, and vice versa. If the datastore allows it, you can also run subscriptions in a different process than the processes reading and writing to the event store.

To get started with subscriptions refer to Using Subscriptions.

EventStore Operations

Occurrent event store implementations may optionally also implement the EventStoreOperations interface. It provides means to delete a specific event, or an entire event stream. For example:

// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId");
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource);
// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId")
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource)

These are probably operations that you want to use sparingly. Typically, you never want to remove any events, but there are some cases, such as GDPR or other regulations, that requires the deletion of an event or an entire event stream. You should be aware that there are other ways to solve this though. One way would be to encrypt personal data and throw away the key when the user no longer uses the service. Another would be to store personal data outside the event store.

Another feature provided by EventStoreOperations is the ability to update an event. Again, this is not something you normally want to do, but it can be useful for certain strategies of GDPR compliance. For example maybe you want to remove or update personal data in an event when a users unregisters from your service. Here’s an example:

eventStoreOperations.updateEvent("cloudEventId", cloudEventSource, cloudEvent -> {
    return CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build();
});
eventStoreOperations.updateEvent("cloudEventId", cloudEventSource) { cloudEvent -> 
    CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build()
})

Views

Occurrent doesn’t have any special components for creating views/projections. Instead, you simply create a subscription in which you can create and store the view as you find fit. But this doesn’t have to be difficult! Here’s a trivial example of a view that maintains the number of ended games. It does so by inceasing the “numberOfEndedGames” field in an (imaginary) database for each “GameEnded” event that is written to the event store:

// An imaginary database API
Database someDatabase = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game.   
subscription.subscribe("my-view", filter(type("GameEnded")), cloudEvent -> someDatabase.inc("numberOfEndedGames"));        
// An imaginary database API
val someDatabase : Database = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game. 
subscription.subscribe("my-view", filter(type("GameEnded"))) {  
    someDatabase.inc("numberOfEndedGames")
}

Where filter is imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is imported from org.occurrent.condition.Condition.

While this is a trivial example it shouldn’t be difficult to create a view that is backed by a JPA entity in a relational database based on a subscription.

Commands

A command is used to represent an action in an event sourced system, i.e. something that you want to do. They’re different, in a very important way, from events in that commands can fail or be rejected, where-as events cannot. A typical example of a command would be a data structure whose name is defined as an imperative verb, for example PlaceOrder. The resulting event, if the command is processed successfully, could then be OrderWasPlaced. However, in Occurrent, as explained in more detail in the Command Philosophy section below, you are encouraged to start off by not using explicit data structures for commands unless you want to. Occurrent instead promotes pure functions to represent commands. Combine this with function composition and you have a powerful way to invoke the domain model (refer to the application service for examples).

Command Philosophy

Occurrent doesn’t contain a built-in command bus. The reason for this is that I’m not convinced that it’s needed in a majority of cases. To send “commands” to another service (remotely) one could call a REST API or make an RPC invocation instead of using a proprietary command bus. One exception to this is if you need location transparency.

But what about internally? For example if a service exposes a REST API and upon receiving a request it publishes a command that’s somehow picked up and routed to a function in your domain model. This is where an application service becomes useful. However, let’s first explore the rationale behind the philosophy of Occurrent. In other frameworks, it’s not uncommon that you define your domain model like this:

public class WordGuessingGame extends AggregateRoot {

    @AggregateId
    private String gameId;
    private String wordToGuess;

    @HandleCommand
    public void handle(StartNewGameCommand startNewGameCommand) {
        // Insert some validation and logic
        ... 
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessingGameWasStartedEvent(...));
    }
    
    @HandleCommand
    public void handle(GuessWordCommand guessWordCommand) {
        // Some validation and implementation ...
        ...	
        
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessedEvent(...));
    }

    @HandleEvent
    public void handle(WordGuessingGameWasStartedEvent e) {
        this.gameId = e.getGameId();
        this.wordToGuess = e.getWordToGuess();    
    }        

    ... 
}
 class WordGuessingGame : AggregateRoot() {
 
     @AggregateId
     var gameId : String
     var wordToGuess : String
 
     @HandleCommand
     fun handle(startNewGameCommand : StartNewGameCommand) {
         // Insert some validation and logic
         ... 
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessingGameWasStartedEvent(...))
     }
     
     @HandleCommand
     fun handle(guessWordCommand : GuessWordCommand) {
         // Some validation and implementation ...
         ...	
         
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessedEvent(...))
     }
 
     @HandleEvent
     fun handle(e : WordGuessingGameWasStartedEvent) {
         gameId = e.getGameId()
         wordToGuess = e.getWordToGuess()    
     }        
 
     ... 
 }
This is a made-up example of an imaginary event sourcing framework, it's not how you're encouraged to implement a domain model using Occurrent.

Let’s look at a “command” and see what it typically looks like in these frameworks:

public class StartNewGameCommand {
    
    @AggregateId
    private String gameId;
    private String wordToGuess;
    
    public void setGameId(String gameId) {
        this.gameId = gameId;
    }
    
    public String getGameId() {
        return gameId;
    }

    public void setWordToGuess(String wordToGuess) {
        this.gameId = gameId;
    }
    
    public String getWordToGuess() {
        return wordToGuess;
    }
    
    // Equals/hashcode/tostring methods are excluded for breivty
}
data class StartNewGameCommand(@AggregateId var gameId: String, val wordToGuess : String)

Now that we have our WordGuessingGame implementation and a command we can dispatch it to a command bus:

commandbus.dispatch(new StartNewGameCommand("someGameId", "Secret word"));
commandbus.dispatch(StartNewGameCommand("someGameId", "Secret word"))

From a typical Java perspective one could argue that this is not too bad. But it does have a few things one could improve upon from a broader perspective:

  1. The WordGuessingGame is complecting several things that may be modelled separately. Data, state, behavior, command- and event routing and event publishing are all defined in the same model (the WordGuessingGame class). It also uses framework specific annotations, classes and inheritance inside your domain model which is something you want to avoid. For small examples like this it arguably doesn’t matter, but if you have complex logic and a large system, it probably will in my experience. Keeping state and behavior separate allows for easier testing, referential transparency and function composition. It allows treating the state as a value which has many benefits.
  2. Commands are defined as explicit data structures (this is not necessarily a bad thing but it will add to your code base) when arguably they don’t have to.

Commands in Occurrent

So how would one dispatch commands in Occurrent? There’s actually nothing stopping you from implementation a simple command bus, create explicit commands, and dispatch them the way we did in the example above. Actually it would be relatively easy to implement the imaginary framework above using Occurrent components. But if you’ve recognized the problems described above you’re probably looking for a different approach. Here’s another way you can do it. First of all let’s refactor our domain model to pure functions, without any state or dependencies to Occurrent or any other library/framework.

public class WordGuessingGame {
	public static Stream<DomainEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<DomainEvent> guessWord(Stream<DomainEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package.
// You might also want to use a Kotlin Sequence or List instead of a Stream
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<DomainEvent>, word : String) : Stream<DomainEvent> = ...
 }

If you define your behavior like this it’ll be really easy to test (and also to compose using normal function composition techniques). There are no side-effects (such as publishing events) which also allows for easier testing and local reasoning.

But where are our commands!? In this example we’ve decided to represent them as functions. I.e. the “command” is modeled as simple function, e.g. startNewGame! But wait, how can I dispatch commands to this function? Just create or copy a generic ApplicationService class like the one below (or use the generic application service provided by Occurrent) if you’re using an object-oriented approach:

public class ApplicationService {

    private final EventStore eventStore;
    private final Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent;
    private final Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent;

    public ApplicationService(EventStore eventStore, 
                              Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent, 
                              Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent) {
        this.eventStore = eventStore;
        this.convertCloudEventToDomainEvent = convertCloudEventToDomainEvent;
        this.convertDomainEventToCloudEvent = convertDomainEventToCloudEvent;
    }

    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);
        // Convert the cloud events into domain events
        Stream<DomainEvent> domainEventsInStream = eventStream.events().map(convertCloudEventToDomainEvent);

        // Call a pure function from the domain model which returns a Stream of domain events  
        Stream<DomainEvent> newDomainEvents = functionThatCallsDomainModel.apply(domainEventsInStream);

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent));
    }
}
class ApplicationService constructor (val eventStore : EventStore, 
                                      val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent, 
                                      val convertDomainEventToCloudEvent : (DomainEvent) -> CloudEvent) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        // Convert the cloud events into domain events
        val domainEventsInStream : Stream<DomainEvent> = eventStream.events().map(convertCloudEventToDomainEvent)

        // Call a pure function from the domain model which returns a Stream of domain events
        val newDomainEvents = functionThatCallsDomainModel(domainEventsInStream)

        // Convert domain events to cloud events and write them to the event store
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent))
    }
}

and then use the ApplicationService like this:

// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
EventStore eventStore = ..
ApplicationService applicationService = new ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.execute(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = ..
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent  = ..
val eventStore : EventStore = ..
val applicationService = ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService.execute(gameId) { events -> 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}

If you’re using a more functional approach you can create a function like this that represents an application service:


public class ApplicationService {

    public static void execute(EventStore eventStore, String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);
        Stream<CloudEvent> eventsInStream = eventStream.events();

        // Call a pure function from the domain model which returns a Stream of events  
        Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventsInStream);

        // Write the new events to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents);
    }
}
fun execute(eventStore : EventStore, streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
    // Read all events from the event store for a particular stream
    val eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
    val domainEventsInStream : Stream<CloudEvent> = eventStream.events()

    // Call a pure function from the domain model which returns a Stream of events
    val newEvents = functionThatCallsDomainModel(domainEventsInStream)

    // Write the new events to the event store 
    eventStore.write(streamId, eventStream.version(), newEvents)
}

and when you have instantiated an EventStore and created functions that converts a CloudEvent to a DomainEvent and vice versa:

// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
// The event store
EventStore eventStore = ..
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = 
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent  = ..
val eventStore : EventStore = ..

then you can compose these functions into a generic “application service function”:

// This example is using an imaginary FP library for Java that has methods such as "partially". 
// You might want to look into "Vavr" or "Functional Java" which includes functions like this  
BiConsumer<String, Function<Stream<DomainEvent>, Stream<DomainEvent>> applicationService = (streamId, domainFn) -> 
            partially(ApplicationService::execute, evenStore, streamId)
            .andThen( domainEventsInStream -> domainEventsInStream.map(convertCloudEventToDomainEvent))
            .andThen(domainFn)
            .andThen( newDomainEvents -> newDomainEvents.map(convertDomainEventToCloudEvent));       
// This example is using an imaginary FP library for Kotlin that has methods such as "partially" and "andThen". 
// You might want to look into the Kotlin "arrow" library which include these functions.
fun applicationService(String, (Stream<DomainEvent>) -> Stream<DomainEvent>) : Unit =  
         partially(ApplicationService::execute, evenStore, streamId)
         .andThen { domainEventsInStream -> domainEventsInStream.map(convertCloudEventToDomainEvent)) }
         .andThen(domainFn)
         .andThen { newDomainEvents -> newDomainEvents.map(convertDomainEventToCloudEvent) }

and then use the “application service function” like this:

// Now in your REST API use the application service function:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.consume(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// Now in your REST API use the application service function:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService(gameId) { events -> 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}

Command Composition

Many times it’s useful to compose multiple commands into a single unit-of-work. While you’re free to use any means and/or library to achieve this, Occurrent ships with a “command composition” library that you can leverage:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>command-composition</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:command-composition:0.3.0'
libraryDependencies += "org.occurrent" % "command-composition" % "0.3.0"
@Grab(group='org.occurrent', module='command-composition', version='0.3.0') 
[org.occurrent/command-composition "0.3.0"]
'org.occurrent:command-composition:jar:0.3.0'
<dependency org="org.occurrent" name="command-composition" rev="0.3.0" />

As an example consider this simple domain model:

public class WordGuessingGame {
    public Stream<DomainEvent> startNewGame(Stream<DomainEvents> events, String gameId, String wordToGuess) {
        // Implementation
    }
    
    public Stream<DomainEvent> guessWord(Stream<DomainEvents> events, String guess) {
        // Implementation
    }
}
object WordGuessingGame {
    fun startNewGame(events : Sequence<DomainEvent>), gameId : String, wordToGuess : String) : Sequence<DomainEvent> {
        // Implementation
    } 

    fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
        // Implementation
    }    
}

Imagine that for a specific API you want to allow starting a new game and making a guess in the same request. Instead of changing your domain model, you can use function composition! If you import org.occurrent.application.composition.command.StreamCommandComposition.composeCommands you can do like this:

String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
    events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess),
    events -> WordGuessingGame.makeGuess(events, guess) 
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
    { events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess) }
    { events -> WordGuessingGame.makeGuess(events, guess) } 
))
If you're using commands that takes and returns a "java.util.List" instead of a Stream, you can instead statically import "composeCommands" from "org.occurrent.application.composition.command.ListCommandComposition".

Events returned from WordGuessingGame.startNewGame(..) will be appended to the event stream when calling WordGuessingGame.makeGuess(..) and the new domain events returned by the two functions will be merged and written in an atomic fashion to the event store.

The command composition library also contains some utilities for partial function application that you can use to further enhance the example above (if you like). If you statically import partial method from org.occurrent.application.composition.command.partial.PartialStreamCommandApplication you can refactor the code above into this:

String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
    partial(WordGuessingGame::startNewGame, gameId, wordToGuess),
    partial(WordGuessingGame::makeGuess, guess)
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
    partial(WordGuessingGame::startNewGame, gameId, wordToGuess)
    partial(WordGuessingGame::makeGuess, guess) 
))
If you're using commands that takes and returns a "java.util.List" instead of a Stream, you can instead statically import "partial" from "org.occurrent.application.composition.command.partial.PartialListCommandApplication".

Command Conversion

If you have an application service that takes a higher-order function in the form of Function<Stream<DomainEvent>, Stream<DomainEvent>> but your domain model is defined with list’s (Function<List<DomainEvent, List<DomainEvent>) then Occurrent provides means to easily convert between them. First you need to depend on the Command Composition library:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>command-composition</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:command-composition:0.3.0'
libraryDependencies += "org.occurrent" % "command-composition" % "0.3.0"
@Grab(group='org.occurrent', module='command-composition', version='0.3.0') 
[org.occurrent/command-composition "0.3.0"]
'org.occurrent:command-composition:jar:0.3.0'
<dependency org="org.occurrent" name="command-composition" rev="0.3.0" />

Let’s say you have a domain model defined like this:

public class WordGuessingGame {
    public List<DomainEvent> guessWord(List<DomainEvents> events, String guess) {
        // Implementation
    }
}
object WordGuessingGame {
    fun guessWord(events : List<DomainEvent>, guess : String) : List<DomainEvent> {
        // Implementation
    }    
}

But you application service takes a Function<Stream<DomainEvent>, Stream<DomainEvent>>:

public class ApplicationService {
    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Implementation
    }
}

Then you can make use of the toStreamCommand in org.occurrent.application.composition.command.CommandConversion to call the domain function:

String guess = ...
applicationService.execute(gameId, toStreamCommand( events -> WordGuessingGame.makeGuess(events, guess)));
val guess = ...
applicationService.execute(gameId, toStreamCommand { events -> WordGuessingGame.makeGuess(events, guess) } )
You can also use the "toListCommand" method to convert a "Function<Stream<DomainEvent>, Stream<DomainEvent>>" into a "Function<List<DomainEvent>, List<DomainEvent>>"

Application Service

Occurrent provides a generic application service that is a good starting point for most use cases. First add the module as a dependency to your project:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>application-service-blocking</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:application-service-blocking:0.3.0'
libraryDependencies += "org.occurrent" % "application-service-blocking" % "0.3.0"
@Grab(group='org.occurrent', module='application-service-blocking', version='0.3.0') 
[org.occurrent/application-service-blocking "0.3.0"]
'org.occurrent:application-service-blocking:jar:0.3.0'
<dependency org="org.occurrent" name="application-service-blocking" rev="0.3.0" />

This module provides an interface, org.occurrent.application.service.blocking.ApplicationService, and a default implementation, org.occurrent.application.service.blocking.implementation.GenericApplicationService. The GenericApplicationService takes an EventStore and a org.occurrent.application.converter.CloudEventConverter implementation as parameters. The latter is used to convert domain events to and from cloud events when loaded/written to the event store. There’s a default implementation that you may decide to use called, org.occurrent.application.converter.implementation.GenericCloudEventConverter. For example:

Application Service Event Conversion

DomainEventConverter domainEventConverter = new DomainEventConverter(new ObjectMapper());
CloudEventConverter<DomainEvent> cloudEventConverter = new GenericCloudEventConverter<>(domainEventConverter::convertToDomainEvent, domainEventConverter::convertToCloudEvent);
val domainEventConverter = DomainEventConverter(ObjectMapper())
val cloudEventConverter = GenericCloudEventConverter(domainEventConverter::convertToDomainEvent, domainEventConverter::convertToCloudEvent)
ObjectMapper a class from the is Jackson library

Here’s an example of a DomainEventConverter (this is something you implement yourself):

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import java.io.IOException;
import java.net.URI;

import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;

public class DomainEventConverter {

    private final ObjectMapper objectMapper;

    public DomainEventConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public CloudEvent convertToCloudEvent(DomainEvent e) {
        try {
            return CloudEventBuilder.v1()
                    .withId(e.getEventId())
                    .withSource(URI.create("http://name"))
                    .withType(e.getClass().getName())
                    .withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC))
                    .withSubject(e.getName())
                    .withDataContentType("application/json")
                    .withData(objectMapper.writeValueAsBytes(e))
                    .build();
        } catch (JsonProcessingException jsonProcessingException) {
            throw new RuntimeException(jsonProcessingException);
        }
    }

    public DomainEvent convertToDomainEvent(CloudEvent cloudEvent) {
        try {
            return (DomainEvent) objectMapper.readValue(cloudEvent.getData().toBytes(), Class.forName(cloudEvent.getType()));
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}        
While this implementation works for simple cases, make sure that you think before simply copying and pasting this class into your own code base. The reason is that you may not need to serialize all data in the domain event to the data field (some parts of the domain event, such as id and type, is already present in the cloud event), and the "type" field contains the fully-qualified name of the class which makes it more difficult to move without loosing backward compatibility. Also your domain events might not be serializable to JSON without conversion. For these reasons, it's recommended to create a more custom mapping between a cloud event and domain event.

See GenericApplicationServiceTest.java for an example.

If your domain model is using a CloudEvent (and not a custom domain event) then just pass a Function.identity() to the GenericCloudEventConverter:

CloudEventConverter<CloudEvent> cloudEventConverter = new GenericCloudEventConverter<>(Function.identity(), Function.identity());

Note that if the data content type in the CloudEvent is specified as “application/json” (or a json compatible content-type) then Occurrent will automatically store it as Bson in a MongoDB event store. The reason for this so that you’re able to query the data, either by the EventStoreQueries API, or manually using MongoDB queries. In order to do this, the byte[] passed to withData, will be converted into a org.bson.Document that is later written to the database. This is not optimal from a performance perspective. A more performant option would be to make use of the org.occurrent.eventstore.mongodb.cloudevent.DocumentCloudEventData class. This class implements the io.cloudevents.CloudEventData interface and allows passing a pre-baked org.bson.Document instance to it. Then no additional conversion will need to take place! Here’s an example:

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.occurrent.eventstore.mongodb.cloudevent.DocumentCloudEventData;
import org.bson.Document;

import java.io.IOException;
import java.net.URI;

import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;

public class DomainEventConverter {

    public CloudEvent convertToCloudEvent(DomainEvent e) {  
            // Convert the data in the domain event into a Document 
            Document eventData = convertDataInDomainEventToMap(e);
            return CloudEventBuilder.v1()
                    .withId(e.getEventId())
                    .withSource(URI.create("http://name"))
                    .withType(e.getClass().getName())
                    .withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC))
                    .withSubject(e.getName())
                    .withDataContentType("application/json")  
                    // Use the "eventData" Document to create an instance of DocumentCloudEventData.
                    .withData(new DocumentCloudEventData(document))
                    .build();
    }

    public DomainEvent convertToDomainEvent(CloudEvent cloudEvent) {
            Document document = ((DocumentCloudEventData) cloudEvent.getData()).getDocument();
            return convertToDomainEvent(cloudEvent, document);
    }       
    
    private static Document convertDataInDomainEventToDocument(DomainEvent e) {
        // Convert the domain event into a Map                
        Map<String, Object> data = ...
        return new Document(data);
    }

    private static DomainEvent convertToDomainEvent(CloudEvent cloudEvent, Document data) {
        // Re-construct the domain event instance from the cloud event and data
    }
}        

The drawback of using this approach is if you need to change to a different EventStore you would still need to depend on the org.occurrent:eventstore-mongodb-common module and make changes to convertToDomainEvent. Note that convertToCloudEvent doesn’t necessarily need to be changed since a CloudEventData implementation is always serializable into a byte[] that all event stores understands.

Using the Application Service

Now you can instantiate the (blocking) GenericApplicationService:

EventStore eventStore = ..
CloudEventConverter<DomainEvent> cloudEventConverter = ..
ApplicationService<DomainEvent> applicationService = new GenericApplicationService<>(eventStore, cloudEventConverter);
val eventStore : EventStore = ..
val cloudEventConverter : CloudEventConverter<DomainEvent> = ..
val applicationService : ApplicationService<DomainEvent> = GenericApplicationService(eventStore, cloudEventConverter)

You’re now ready to use the generic application service in your application. As an example let’s say you have a domain model with a method defined like this:

public class WordGuessingGame {
    public static Stream<DomainEvent> guessWord(Stream<DomainEvent> events, String guess) {
        // Implementation
    }    
}

You can call it using the application service:

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess));
applicationService.execute(gameId) { events -> 
WordGuessingGame.guessWord(events, guess)
}

Application Service Side-Effects

The GenericApplicationService supports executing side-effects after the events returned from the domain model have been written to the event store. This is useful if you need to, for example, update a view synchronously after events have been written to the event store. Note that to perform side-effects (or policies) asynchronously you should use a subscription. As an example, consider that you want to synchronously register a game as ongoing when it is started. It may be defined like this:

public class RegisterOngoingGame {
    private final DatabaseApi someDatabaseApi;
    
    pubic RegisterOngoingGame(DatabaseApi someDatabaseApi) {
        this.someDatabaseApi = someDatabaseApi;
    }

    public void registerGameAsOngoingWhenGameWasStarted(GameWasStarted event) {
        // Add the id of the game started event to a set to handle duplicates and idempotency.
        someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));                
    }
}
class RegisterOngoingGame(private val someDatabaseApi : DatabaseApi) {
    fun registerGameAsOngoingWhenGameWasStarted(event : GameWasStarted) {
        // Add the id of the game started event to a set to handle duplicates and idempotency.
        someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));                
    }
}

There reason for doing this synchronously is, for example, if you have a REST API and the player expects the ongoing games view to be updated once the “start game” command has executed. This can be achieved by other means (RSocket, Websockets, server-sent events, polling) but synchronous updates is simple and works quite well in many cases.

Now that we have the code that registers ongoing games, we can call it from our from the application service:

RegisterOngoingGame registerOngoingGame = ..
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), events -> {
    events.filter(event -> event instanceof GameWasStarted)
         .findFirst()
         .map(GameWasStarted.class::cast)
         .ifPresent(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
});
val registerOngoingGame : RegisterOngoingGame = ..
applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }) { events -> 
    val gameWasStarted = events.find { event is GameWasStarted }
    if(gameWasStarted != null) {
        registerOngoingGame.registerGameAsOngoingWhenGameWasStarted(gameWasStarted) 
    }
}

Voila! Now registerGameAsOngoingWhenGameWasStarted will be called after the events returned from WordGuessingGame.guessWord(..) is written to the event store. You can however improve on the code above and make use of the executePolicy method shipped with the application service in the org.occurrent.application.service.blocking.PolicySideEffect. The code can then be refactored to this:

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)); 

If using the Kotlin extension functions (org.occurrent.application.service.blocking.executePolicy) you can write the code like this:

applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }, executePolicy<GameWasStarted>(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted))

Policies can also be composed:

RegisterOngoingGame registerOngoingGame  = ..
RemoveFromOngoingGamesWhenGameEnded removeFromOngoingGamesWhenGameEnded = ..

applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), 
                           executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
                            .andThenExecuteAnotherPolicy(removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded));
val registerOngoingGame : RegisterOngoingGame = ..
val removeFromOngoingGamesWhenGameEnded : RemoveFromOngoingGamesWhenGameEnded = ..

applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }, 
                            executePolicies(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted, 
                                            removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded))

Application Service Transactional Side-Effects

In the example above, writing the events to the event store and executing policies is not an atomic operation. If your app crashes before after the call to registerOngoingGame::registerGameAsOngoingWhenGameWasStarted but before removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded, you will need to handle idempotency. But if your policies/side-effects are writing data to the same database as the event store you can make use of transactions to write everything atomically! This is very easy if you’re using a Spring EventStore. What you need to do is to wrap the ApplicationService provided by Occurrent in your own application service, something like this:

@Service
public class CustomApplicationServiceImpl implements ApplicationService<DomainEvent> {
	private final GenericApplicationService<DomainEvent> occurrentApplicationService;

	public CustomApplicationService(GenericApplicationService<DomainEvent> occurrentApplicationService) {
		this.occurrentApplicationService = occurrentApplicationService;
	}

	@Transactional
	@Override
    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel, Consumer<Stream<DomainEvent>> sideEffect) {
		occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect);
    }
}
@Service
class CustomApplicationServiceImpl(val occurrentApplicationService:  GenericApplicationService<DomainEvent>) : ApplicationService<DomainEvent> {

	@Transactional
    override fun execute(streamId : String, functionThatCallsDomainModel: Function<Stream<DomainEvent>, Stream<DomainEvent>> , sideEffect : Consumer<Stream<DomainEvent>>) {
		occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect)
    }
}

Given that you’ve defined a MongoTransactionManager in your Spring Boot configuration (and using this when creating your event store instance) the side-effects and events are written atomically in the same transaction!

Application Service Kotlin Extensions

If you’re using Kotlin chances are that your domain model is using a Sequence instead of a java Stream:

object WordGuessingGame {
    fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
        // Implementation
    }    
}

Occurrent provides a set of extension functions for Kotlin in this module:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>application-service-blocking-kotlin</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:application-service-blocking-kotlin:0.3.0'
libraryDependencies += "org.occurrent" % "application-service-blocking-kotlin" % "0.3.0"
@Grab(group='org.occurrent', module='application-service-blocking-kotlin', version='0.3.0') 
[org.occurrent/application-service-blocking-kotlin "0.3.0"]
'org.occurrent:application-service-blocking-kotlin:jar:0.3.0'
<dependency org="org.occurrent" name="application-service-blocking-kotlin" rev="0.3.0" />
This module depends on 'org.occurrent:application-service-blocking:0.3.0' so you don't need to depend on that module explicitly.

You can then use one of the org.occurrent.application.service.blocking.execute extension functions to do:

applicationService.execute(gameId) { events : Sequence<DomainEvent> -> 
    WordGuessingGame.guessWord(events, guess)
}

Sagas

A “saga” can be used to represent and coordinate a long-lived business transaction/process (where “long-lived” is kind of arbitrary). This is an advanced subject and you should try to avoid sagas if there are other means available to solve the problem (for example use policies if they are sufficient). Occurrent doesn’t provide or enforce any specific Saga implementation. But since Occurrent is a library you can hook in already existing solutions, for example:

The way to integrate Occurrent with any of these libraries/frameworks/patterns is to create a subscription that forwards the events written to the event store to the preferred library/framework/view.

Policy

A policy (or “reaction”) can be used to deal with workflows such as “whenever this happens, do that”. For example, whenever a game is won, send an email to the winner. For simple workflows like this there’s typically no need for a full-fledged saga.

Asynchronous Policy

In Occurrent, you can create asynchronous policies by creating a subscription. Let’s consider the example above:

public class WhenGameWonThenSendEmailToWinnerPolicy {

    private final BlockingSubscription subscription;
    private final EmailClient emailClient;
    private final Players players;
    
    public WhenGameWonThenSendEmailToWinnerPolicy(BlockingSubscription subscription, EmailClient emailClient, Players players) {
        this.subscription = subscription;
        this.emailClient = emailClient;
        this.players = players;
    }
    
    @PostConstruct
    public void whenGameWonThenSendEmailToWinner() {
        subscription.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")), cloudEvent -> {
            String playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()));
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!");
        }          
    }
}
class WhenGameWonThenSendEmailToWinnerPolicy(val subscription : BlockingSubscription, 
                                             val emailClient : EmailClient, val players : Players) {

    @PostConstruct
    fun whenGameWonThenSendEmailToWinner() = 
        subscription.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")) { cloudEvent -> 
            val playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()))
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!")
        }          
}

You could also create a generic policy that simply forwards all events to another piece of infrastructure. For example, you may wish to forward all events to rabbitmq (by publishing them) or Spring’s event infrastructure, and then create policies that subscribes to events from these systems instead. There’s an example in the github repository that shows an example of how one can achieve this.

You may also want to look into the “todo-list” pattern described in the automation section on the in the event modeling website.

Synchronous Policy

In some cases, for example if you have a simple website and you want views to be updated when a command is dispatched by a REST API, it can be useful to update a policy in a synchronous fashion. The application service provided by Occurrent allows for this, please see the application service documentation for an example.

Snapshots

Using snapshots is an advanced technique and it shouldn't be used unless it's really necessary.

Snapshotting is an optimization technique that can be applied if it takes too long to derive the current state from an event stream for each command. There are several ways to do this and Occurrent doesn’t enforce any particular strategy. One strategy is to use so called “snapshot events” (special events that contains a pre-calculated view of the state of an event stream at a particular time) and another technique is to write snapshots to another datastore than the event store.

The application service need to be modified to first load the up the snapshot and then load events that have not yet been materialized in the snapshot (if any).

Synchronous Snapshots

With Occurrent you can trade-off write speed for understandability. For example, let’s say that you want to update the snapshot on every write and it should be consistent with the writes to the event store. One way to do this is to use Spring’s transactional support:

public class ApplicationService {

    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    
    public ApplicationService(EventStore eventStore,  SnapshotRepository snapshotRepository) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
    }
    
    @Transactional
    public void execute(String streamId, BiFunction<Snapshot, Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read snapshot from a the snapshot repsitory 
        Snapshot snapshot = snapshotRepsitory.findByStreamId(streamId);
        long snapshotVersion = snapshot.version();        
    
        // Read all events for "streamId" from snapshotVersion  
        EventStream<CloudEvent> eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE);

        // Call a pure function from the domain model which returns a Stream of new events  
        List<CloudEvent> newEvents = functionThatCallsDomainModel.apply(snapshot.state(), eventStream).collect(Collectors.toList());

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream());
        
        // Update the snapshot
        Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
        snapshotRepsitory.save(updatedSnapshot);
    }
}
class ApplicationService(val eventStore : EventStore, val snapshotRepository : SnapshotRepository) {
    
    @Transactional
    fun execute(String streamId, functionThatCallsDomainModel : (Snapshot, Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read snapshot from a the snapshot repsitory 
        val snapshot : Snapshot = snapshotRepsitory.findByStreamId(streamId)
        long snapshotVersion = snapshot.streamVersion()        
    
        // Read all events for "streamId" from snapshotVersion  
        val eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE)

        // Call a pure function from the domain model which returns a Stream of new events  
        val newEvents = functionThatCallsDomainModel(snapshot.state(), eventStream).collect(Collectors.toList())

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream())
        
        // Update the snapshot
        val updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version())
        snapshotRepsitory.save(updatedSnapshot)
    }
}
This is a somewhat simplified example but the idea is hopefully made clear.

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (eventStream.version() - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Asynchronous Snapshots

As an alternative to synchronous and fully-consistent snapshots, you can update snapshots asynchronously. You do this by creating a subscription that updates the snapshot. For example:

public class UpdateSnapshotWhenNewEventsAreWrittenToEventStore {

    private final BlockingSubscription subscription;
    private final SnapshotRepository snapshotRepository;
    
    public UpdateSnapshotWhenNewEventsAreWrittenToEventStore(BlockingSubscription subscription, SnapshotRepository snapshotRepository) {
        this.subscription = subscription;
        this.snapshotRepository = snapshotRepository;
    }
    
    @PostConstruct
    public void updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", cloudEvent -> {
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent);
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent);
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId);        
            snapshot.updateFrom(cloudEvent, streamVersion);
            snapshotRepository.save(snapshot);
        }          
    }
}
class UpdateSnapshotWhenNewEventsAreWrittenToEventStore(val subscription : BlockingSubscription, 
                                                        val snapshotRepository : SnapshotRepository) {

    @PostConstruct
    fun updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", { cloudEvent -> 
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent)
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent)
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId)        
            snapshot.updateFrom(cloudEvent, streamVersion)
            snapshotRepository.save(snapshot)
        }          
    }
}

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (streamVersion - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Closing the Books

This is a pattern that can be applied instead of updating snapshots for every n event. The idea is to try to keep event streams short and instead create snapshots periodically. For example, once every month we run a job that creates snapshots for certain event streams. This is especially well suited for problem domains where “closing the books” is a concept in the domain (such as accounting).

Scheduling

Scheduling (aka deadlines, alarm clock) is a very handy technique to schedule to commands to be executed in the future or periodically. Imagine, for example, a multiplayer game (like word guessing game shown in previous examples), where we want to game to end automatically after 10 hours of inactivity. This means that as soon as a player has made a guess, we’d like to schedule a “timeout game command” to be executed after 10 hours.

Occurrent doesn’t currently have any built-in support for this (but a small wrapper around an existing library is planned), but there are several libraries you can use from the Java ecosystem, such as:

Getting started

Getting started with Occurrent involves these steps:

It's recommended to read up on CloudEvent's and its specification so that you're familiar with the structure and schema of a CloudEvent.
  1. Choose an underlying datastore for an event store. Luckily there are only two choices at the moment, MongoDB and an in-memory implementation. Hopefully this will be a more difficult decision in the future :)
  2. Once a datastore has been decided it’s time to choose an EventStore implementation for this datastore since there may be more than one.
  3. If you need subscriptions (i.e. the ability to subscribe to changes from an EventStore) then you need to pick a library that implements this for the datastore that you’ve chosen. Again, there may be several implementations to choose from.
  4. If a subscriber needs to be able to continue from where it left off on application restart, it’s worth looking into a so called “position storage” library. These libraries provide means to automatically (or selectively) store the position for a subscriber to a datastore. Note that the datastore that stores this position can be a different datastore than the one used as EventStore. For example, you can use MongoDB as EventStore but store subscription positions in Redis.

Choosing An EventStore

There are currently two different datastores to choose from, MongoDB and In-Memory.

MongoDB

Uses MongoDB, version 4.2 or above, as the underlying datastore for the CloudEvents. All implementations use transactions to guarantee consistent writes (see WriteCondition). Each EventStore will automatically create a few indexes on startup to allow for fast consistent writes, optimistic concurrency control and to avoid duplicated events. These indexes can also be used in queries against the EventStore (see EventStoreQueries).

There are three different MongoDB EventStore implementations to choose from:

MongoDB Schema

All MongoDB EventStore implementations tries to stay as close as possible to the CloudEvent’s specification even in the persitence layer. Occurrent, by default, automatically adds a custom “Occurrent extension” to each cloud event that is written to an EventStore. The Occurrent CloudEvent Extension consists of these attributes:


Attribute Name Type Description
streamId         String         An id the uniquely identifies a particular event stream.
It’s used to determine which events belong to which stream.
streamVersion Long The id of the stream version for a particular event.
It’s used for optimistic concurrency control.

A json schema describing a complete Occurrent CloudEvent, as it will be persisted to a MongoDB collection, can be found here (a “raw” cloud event json schema can be found here for comparison).

Note that MongoDB will automatically add an _id field (which is not used by Occurrent). The reason why the CloudEvent id attribute is not stored as _id in MongoDB is that the id of a CloudEvent is not globally unique! The combination of id and source is a globally unique CloudEvent. Note also that _id will not be included when the CloudEvent is read from an EventStore.

Here’s an example of what you can expect to see the “events” collection when storing events in an EventStore backed by MongoDB (given that TimeRepresentation is set to DATE):

{
	"_id : ObjectId("5f4112a348b8da5305e41f57"),
	"specversion" : "1.0",
	"id" : "bdb8481f-9e8e-443b-80a4-5ef787f0f227",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "NumberGuessingGameWasStarted",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:11.712Z"),
	"data" : {
		"secretNumberToGuess" : 8,
		"startedBy" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf",
		"maxNumberOfGuesses" : 5
	},
	"streamId" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamVersion" : NumberLong(1)
}
{
	"_id" : ObjectId("5f4112a548b8da5305e41f58"),
	"specversion" : "1.0",
	"id" : "c1bfc3a5-1716-43ae-88a6-297189b1b5c7",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "PlayerGuessedANumberThatWasTooSmall",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:13.336Z"),
	"data" : {
		"guessedNumber" : 1,
		"playerId" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf"
	},
	"streamId" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamVersion" : NumberLong(2)
}

MongoDB Time Representation

The CloudEvents specification says that the time attribute, if present, must adhere to the RFC 3339 specification. To accommodate this in MongoDB, the time attribute must be persisted as a String. This by itself is not a problem, a problem only arise if you want to make time-based queries on the events persisted to a MongoDB-backed EventStore (using the EventStoreQueries interface). This is, quite obviously, because time-based queries on String’s are suboptimal (to say the least) and may lead to surprising results. What we would like to do is to persist the time attribute as a Date in MongoDB, but MongoDB internally represents a Date with only millisecond resolution (see here) and then the CloudEvent cannot be compliant with the RFC 3339 specification in all circumstances.

Because of the reasons described above, users of a MongoDB-backed EventStore implementation, must decide how the time attribute is to be represented in MongoDB when instantiating an EventStore implementation. This is done by passing a value from the org.occurrent.mongodb.timerepresentation.TimeRepresentation enum to an EventStoreConfig object that is then passed to the EventStore implementation. TimeRepresentation has these values:

Value Description
RFC_3339_STRING     Persist the time attribute as an RFC 3339 string. This string is able to represent both nanoseconds and a timezone so this is recommended for apps that need to store this information or if you are uncertain of whether this is required in the future.

DATE

Persist the time attribute as a MongoDB Date. The benefit of using this approach is that you can do range queries etc on the “time” field on the cloud event. This can be really useful for certain types on analytics or projections (such as show the 10 latest number of started games) without writing any custom code.

Note that if you choose to go with RFC_3339_STRING you always have the option of adding a custom attribute, named for example “date”, in which you represent the “time” attribute as a Date when writing the events to an EventStore. This way you have the ability to use the “time” attribute to re-construct the CloudEvent time attribute exactly as well as the ability to do custom time-queries on the “date” attribute. However, you cannot use the methods involving time-based queries when using the EventStoreQueries interface.

Important: There’s yet another option! If you don’t need nanotime precision (i.e you’re fine with millisecond precision) and you’re OK with always representing the “time” attribute in UTC, then you can use TimeRepresentation.DATE without loss of precision! This is why, if DATE is configured for the EventStore, Occurrent will refuse to store a CloudEvent that specifies nanotime and is not defined in UTC (so that there won’t be any surprises). I.e. using DATE and then doing this will throw an IllegalArgumentException:

var cloudEvent = new CloudEventBuilder().time(OffsetDateTime.now()). .. .build();

// Will throw exception since OffsetDateTime.now() will include nanoseconds by default in Java 9+
eventStore.write(Stream.of(cloudEvent));

Instead, you need to remove nano seconds do like this explicitly:

// Remove millis and make sure to use UTC as timezone                                          
var now = OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS).withOffsetSameInstant(ZoneOffset.UTC);
var cloudEvent = new CloudEventBuilder().time(now). .. .build();

// Now you can write the cloud event
eventStore.write(Stream.of(cloudEvent));

For more thoughts on this, refer to the architecture decision record on time representation in MongoDB.

MongoDB Indexes

Each MongoDB EventStore implementation creates a few indexes for the “events collection” the first time they’re instantiated. These are:

Name Properties Description
streamId ascending An index for the streamId property. Allows for fast reads of all events in a particular stream.
id + source ascending id,
descending source,  
unique

Compound index of id and source to comply with the specification that the id+source combination must be unique.
streamId + streamVersion   ascending streamId,
descending streamVersion,
unique
Compound index of streamId and streamVersion (Occurrent CloudEvent extension) used for fast retrieval of the latest cloud event in a stream.

To allow for fast queries, for example when using EventStoreQueries, it’s recommended to create additional indexes tailored to the querying behavior of your application. See MongoDB indexes for more information on how to do this. If you have many adhoc queries it’s also worth checking out wildcard indexes which is a new feature in MongoDB 4.2. These allow you to create indexes that allow for arbitrary queries on e.g. the data attribute of a cloud event (if data is stored in json/bson format).

MongoDB EventStore Implementations

There are three different MongoDB EventStore implementations to choose from:

EventStore with MongoDB Native Driver

What is it?

An EventStore implementation that uses the “native” Java MongoDB synchronous driver (see website) to read and write CloudEvent’s to MongoDB.

When to use?

Use when you don’t need Spring support and want to use MongoDB as the underlying datastore.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-native</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-native:0.3.0'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-native" % "0.3.0"
@Grab(group='org.occurrent', module='eventstore-mongodb-native', version='0.3.0') 
[org.occurrent/eventstore-mongodb-native "0.3.0"]
'org.occurrent:eventstore-mongodb-native:jar:0.3.0'
<dependency org="org.occurrent" name="eventstore-mongodb-native" rev="0.3.0" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.nativedriver.MongoEventStore. It takes four arguments, a MongoClient, the “database” and “event collection “that the EventStore will use to store events as well as an org.occurrent.eventstore.mongodb.nativedriver.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
EventStoreConfig config = new EventStoreConfig(TimeRepresentation.RFC_3339_STRING);
String mongoDatabase = "database";
String mongoEventCollection = "events";

MongoEventStore eventStore = new MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
val config = EventStoreConfig(TimeRepresentation.RFC_3339_STRING)
val mongoDatabase = "database"
val mongoEventCollection = "events"

val eventStore = MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using MongoEventStore. It also generates integration events and publishes these to RabbitMQ.

EventStore with Spring MongoTemplate (Blocking)

What is it?

An implementation that uses Spring’s MongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and you don’t need reactive support then this is a good choice. You can make use of the @Transactional annotation to write events and views in the same transaction (but make sure you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-blocking</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-blocking:0.3.0'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-blocking" % "0.3.0"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-blocking', version='0.3.0') 
[org.occurrent/eventstore-mongodb-spring-blocking "0.3.0"]
'org.occurrent:eventstore-mongodb-spring-blocking:jar:0.3.0'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-blocking" rev="0.3.0" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringBlockingMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

SpringBlockingMongoEventStore eventStore = new SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using SpringBlockingMongoEventStore and Spring Boot. It also generates integration events and publishes these to RabbitMQ.
Subscription View An example showing how to create a subscription that listens to certain events stored in the EventStore and updates a view/projection from these events.
Transactional View An example showing how to combine writing events to the SpringBlockingMongoEventStore and update a view transactionally using the @Transactional annotation.
Custom Aggregation View Example demonstrating that you can query the SpringBlockingMongoEventStore using custom MongoDB aggregations.

EventStore with Spring ReactiveMongoTemplate (Reactive)

What is it?

An implementation that uses Spring’s ReactiveMongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and want to use the reactive driver (project reactor) then this is a good choice. It uses the ReactiveMongoTemplate to write events to MongoDB. You can make use of the @Transactional annotation to write events and views in the same tx (but make sure that you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-reactor</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-reactor:0.3.0'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-reactor" % "0.3.0"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-reactor', version='0.3.0') 
[org.occurrent/eventstore-mongodb-spring-reactor "0.3.0"]
'org.occurrent:eventstore-mongodb-spring-reactor:jar:0.3.0'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-reactor" rev="0.3.0" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringBlockingMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

SpringBlockingMongoEventStore eventStore = new SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
Mono<Void> mono = eventStore.write("streamId", Flux.just(event));

// Read
Mono<EventStream<CloudEvent>> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
val mono = eventStore.write("streamId", Flux.just(event))

// Read
val eventStream : Mono<EventStream<CloudEvent>> = eventStore.read("streamId")

Examples

Name Description
Custom Aggregation View Example demonstrating that you can query the SpringBlockingMongoEventStore using custom MongoDB aggregations.

In-Memory EventStore

What is it?

A simple in-memory implementation of the EventStore interface.

When to use?

Mainly for testing purposes or for integration tests that doesn’t require a durable event store.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-inmemory</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:eventstore-inmemory:0.3.0'
libraryDependencies += "org.occurrent" % "eventstore-inmemory" % "0.3.0"
@Grab(group='org.occurrent', module='eventstore-inmemory', version='0.3.0') 
[org.occurrent/eventstore-inmemory "0.3.0"]
'org.occurrent:eventstore-inmemory:jar:0.3.0'
<dependency org="org.occurrent" name="eventstore-inmemory" rev="0.3.0" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.inmemory.InMemoryEventStore. For example:

InMemoryEventStore eventStore = new InMemoryEventStore();
val eventStore = InMemoryEventStore()

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }".toByteArray())
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Note that InMemoryEventStore doesn’t support EventStoreQueries.

Using Subscriptions

Before you start using subscriptions you should read up on what they are here.

There a two different kinds of subscriptions, blocking subscriptions and reactive subscriptions. For blocking subscription implementations see here and for reactive subscription implementations see here.

Blocking Subscriptions

A “blocking subscription” is a subscription that uses the normal Java threading mechanism for IO operations, i.e. reading changes from an EventStore will block the thread. This is arguably the easiest and most familiar way to use subscriptions for the typical Java developer, and it’s probably good-enough for most scenarios. If high throughput, low CPU and memory-consumption is critical then consider using reactive subscription instead. Reactive subscriptions are also better suited if you want to work with streaming data.

All blocking subscriptions implements the org.occurrent.subscription.api.blocking.BlockingSubscription interface. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscription.subscribe("mySubscriptionId", System.out::println);
subscription.subscribe("mySubscriptionId", ::println)

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface BlockingSubscription {
    /**
     * Start listening to cloud events persisted to the event store using the supplied start position and <code>filter</code>.
     *
     * @param subscriptionId  The id of the subscription, must be unique!
     * @param filter          The filter used to limit which events that are of interest from the EventStore.
     * @param startAtSupplier A supplier that returns the start position to start the subscription from.
     *                        This is a useful alternative to just passing a fixed "StartAt" value if the stream is broken and re-subscribed to.
     *                        In these cases, streams should be restarted from the latest persisted position and not the start position as it
     *                        were when the application was first started.
     * @param action          This action will be invoked for each cloud event that is stored in the EventStore.
     */
    Subscription subscribe(String subscriptionId, SubscriptionFilter filter, Supplier<StartAt> startAtSupplier, Consumer<CloudEvent> action);

    // Default methods 

}

It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a BlockingSubscription implementation that also implement the org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription interface. The PositionAwareBlockingSubscription is an example of a BlockingSubscription that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.PositionAwareCloudEvent which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent) and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent). Note that PositionAwareCloudEvent is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareBlockingSubscription, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Blocking Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscription.subscribe("mySubscriptionId", filter(type("GameEnded")), System.out::println);
subscription.subscribe("mySubscriptionId", filter(type("GameEnded")), ::println)

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), System.out::println);
subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), ::println)

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Blocking Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides two ways to specify the start position, either by using StartAt.now() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Blocking Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a blocking subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.blocking.BlockingSubscriptionPositionStorage acts as a uniform abstraction for this purpose. A BlockingSubscriptionPositionStorage is defined like this:

public interface BlockingSubscriptionPositionStorage {
    SubscriptionPosition read(String subscriptionId);
    SubscriptionPosition save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    void delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships with three pre-defined implementations:

1. BlockingSubscriptionPositionStorageForMongoDB
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking-position-storage</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-position-storage:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-position-storage" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-position-storage', version='0.3.0') 
[org.occurrent/subscription-mongodb-native-blocking-position-storage "0.3.0"]
'org.occurrent:subscription-mongodb-native-blocking-position-storage:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-position-storage" rev="0.3.0" />

2. SpringBlockingSubscriptionPositionStorageForMongoDB
Uses the Spring MongoTemplate to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking-position-storage</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-position-storage:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-position-storage" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-position-storage', version='0.3.0') 
[org.occurrent/subscription-mongodb-spring-blocking-position-storage "0.3.0"]
'org.occurrent:subscription-mongodb-spring-blocking-position-storage:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-position-storage" rev="0.3.0" />

3. SpringBlockingSubscriptionPositionStorageForRedis
Uses the Spring RedisTemplate to store SubscriptionPosition’s in Redis.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-redis-spring-blocking-position-storage</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-redis-spring-blocking-position-storage:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-redis-spring-blocking-position-storage" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-redis-spring-blocking-position-storage', version='0.3.0') 
[org.occurrent/subscription-redis-spring-blocking-position-storage "0.3.0"]
'org.occurrent:subscription-redis-spring-blocking-position-storage:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-redis-spring-blocking-position-storage" rev="0.3.0" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “blocking subscription API” which contains the BlockingSubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-blocking</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-api-blocking:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-api-blocking" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-api-blocking', version='0.3.0') 
[org.occurrent/subscription-api-blocking "0.3.0"]
'org.occurrent:subscription-api-blocking:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-api-blocking" rev="0.3.0" />

Blocking Subscription Implementations

These are the non-durable blocking subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription interface (since these types of subscriptions doesn’t need to be aware of the position).

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareBlockingSubscription and a BlockingSubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a subscription position storage implementation, and store the position yourself when you find fit.

Blocking Subscription using the “Native” Java MongoDB Driver

Uses the vanilla Java MongoDB synchronous driver (no Spring dependency is required).

To get started first include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking', version='0.3.0') 
[org.occurrent/subscription-mongodb-native-blocking "0.3.0"]
'org.occurrent:subscription-mongodb-native-blocking:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking" rev="0.3.0" />

Then create a new instance of BlockingSubscriptionForMongoDB and start subscribing:

MongoTemplate mongoTemplate = mongoClient.getDatabase("some-database");
// Create the blocking subscription
BlockingSubscription subscriptions = new BlockingSubscriptionForMongoDB(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.fixed(200));

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val database = mongoClient.getDatabase("some-database")
// Create the blocking subscription
val subscriptions = BlockingSubscriptionForMongoDB(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.fixed(200))

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
BlockingSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.nativedriver.blocking" package.

There are a few things to note here that needs explaining. First we have the TimeRepresentation.DATE that is passed as the third constructor argument which you can read more about here. Secondly we have the Executors.newCachedThreadPool(). A thread will be created from this executor for each call to “subscribe” (i.e. for each subscription). Make sure that you have enough threads to cover all your subscriptions or the “BlockingSubscription” may hang. Last we have the RetryStrategy which defines what should happen if there’s e.g. a connection issue during the life-time of a subscription or if subscription fails to process a cloud event (i.e. the action throws an exception). These retry strategies are available:

Name Description
none Don’t retry! Instead the subscription will fail fast and not continue if there’s an error.
fixed Retry operation after a fixed number of milliseconds or Duration.
backoff     Retry after with exponential backoff if an action throws an exception.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Blocking Subscription using Spring MongoTemplate

An implementation that uses Spring’s MongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking', version='0.3.0') 
[org.occurrent/subscription-mongodb-spring-blocking "0.3.0"]
'org.occurrent:subscription-mongodb-spring-blocking:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking" rev="0.3.0" />

Then create a new instance of SpringBlockingSubscriptionForMongoDB and start subscribing:

MongoTemplate mongoTemplate = ...
// Create the blocking subscription
BlockingSubscription subscriptions = new SpringBlockingSubscriptionForMongoDB(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptions.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val mongoTemplate : MongoTemplate = ... 
// Create the blocking subscription
val subscriptions = SpringBlockingSubscriptionForMongoDB(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
SpringBlockingSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.spring.blocking" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the MongoTemplate instance.

When it comes to retries, if the “action” fails (i.e. if the higher-order function you provide when calling subscribe throws an exception), Occurrent doesn’t provide retries out of the box. This is most likely something you want to add, and since you’re using Spring you probably want to look into Spring Retry. For example consider that you have a simple Spring bean that writes each cloud event to a repository:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

And you want to subscribe to all “GameStarted” events and write them to the repository:

WriteToRepository writeToRepository = ...
subscriptions.subscribe("gameStartedLog", writeToRepository::write);

But if the connection to someRepository is flaky you can add Spring Retry so allow for retry with exponential backoff:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

    @Retryable(backoff = @Backoff(delay = 200, multiplier = 2, maxDelay = 30000))
	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

Don’t forget to add @EnableRetry in to your Spring Boot application as well.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Automatic Subscription Position Persistence (Blocking)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that implements BlockingSubscription and combines a PositionAwareBlockingSubscription and a BlockingSubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to BlockingSubscriptionWithAutomaticPositionPersistenceConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new BlockingSubscriptionWithAutomaticPositionPersistenceConfig(3) that creates an instance of EveryN that stores the position for every third event.

If you want full control, it’s recommended to pick a subscription position storage implementation, and store the position yourself using its API.

To use it, first we need to add the dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-blocking-automatic-position-persistence</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-util-blocking-automatic-position-persistence:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-util-blocking-automatic-position-persistence" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-util-blocking-automatic-position-persistence', version='0.3.0') 
[org.occurrent/subscription-util-blocking-automatic-position-persistence "0.3.0"]
'org.occurrent:subscription-util-blocking-automatic-position-persistence:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-util-blocking-automatic-position-persistence" rev="0.3.0" />

Then we should instantiate a PositionAwareBlockingSubscription, that subscribes to the events from the event store, and an instance of a BlockingSubscriptionPositionStorage, that stores the subscription position, and combine them to a BlockingSubscriptionWithAutomaticPositionPersistence:

// Create the non-durable blocking subscription instance 
PositionAwareBlockingSubscription nonDurableSubscription = ...
// Create the storage
BlockingSubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription and the subscription position storage
BlockingSubscription durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);

// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscription : PositionAwareBlockingSubscription = ...
// Create the storage
val storage : BlockingSubscriptionPositionStorage = ...

// Now combine the non-durable subscription and the subscription position storage
val durableSubscription : BlockingSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)

// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) };

Catch-up Subscription (Blocking)

When starting a new subscription it’s often useful to first replay historic events to get up-to-speed and then subscribing to new events as the arrive. A catch-up subscription allows for exactly this! It combines the EventStoreQueries API with a subscription and an optional subscription storage. It starts off by streaming historic events from the event store and then automatically switch to continuous streaming mode once the historic events have caught up.

To get start you need to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-blocking-catchup-subscription</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-util-blocking-catchup-subscription:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-util-blocking-catchup-subscription" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-util-blocking-catchup-subscription', version='0.3.0') 
[org.occurrent/subscription-util-blocking-catchup-subscription "0.3.0"]
'org.occurrent:subscription-util-blocking-catchup-subscription:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-util-blocking-catchup-subscription" rev="0.3.0" />

For example:

// Create the subscription position storage. Note that if PositionAwareBlockingSubscription
// is also storing the position, it's highly recommended to share the same BlockingSubscriptionPositionStorage instance.     
BlockingSubscriptionPositionStorage storage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
PositionAwareBlockingSubscription continuousSubscription = ...
// Instantiate an event store that implements the EventStoreQueries API
EventStoreQueries eventStoreQueries = ... 


// Now combine the continuous subscription and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting 
// from the beginning if the application where to crash during the catch-up phase.   
CatchupSupportingBlockingSubscription catchupSubscription = new CatchupSupportingBlockingSubscription(continuousSubscription, eventStoreQueries, 
            new CatchupSupportingBlockingSubscriptionConfig(useSubscriptionPositionStorage(storage)
                    .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3));

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded")), StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()), cloudEvent -> System.out.println(cloudEvent));

// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at 
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSupportingBlockingSubscription later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded")), cloudEvent -> System.out.println(cloudEvent));

// is also storing the position, it's highly recommended to share the same BlockingSubscriptionPositionStorage instance.     
val storage : BlockingSubscriptionPositionStorage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
val continuousSubscription : PositionAwareBlockingSubscription= ...
// Instantiate an event store that implements the EventStoreQueries API
val eventStoreQueries : EventStoreQueries = ... 


// Now combine the continuous subscription and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting 
// from the beginning if the application where to crash during the catch-up phase. If you don't
// use "andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents" but just "useSubscriptionPositionStorage(storage)"
// then the CatchupSupportingBlockingSubscription will still start from the position in the storage, but not write to it.
// The continuous subscription (passed as first parameter to CatchupSupportingBlockingSubscription) might write to the store, 
// which means that once the CatchupSupportingBlockingSubscription has caught up and the continuous subscription starts
// writing the position, the CatchupSupportingBlockingSubscription will just delegate to continuous subscription if it finds
// a position in the storage.           
val catchupSubscription = CatchupSupportingBlockingSubscription(continuousSubscription, eventStoreQueries, 
            CatchupSupportingBlockingSubscriptionConfig(useSubscriptionPositionStorage(storage)
                    .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3))

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded")), StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())) { cloudEvent -> 
    println(cloudEvent)
}

// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at 
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSupportingBlockingSubscription later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded"))) { cloudEvent -> 
    println(cloudEvent)
}

CatchupSupportingBlockingSubscription maintains an in-memory cache of event ids. The size of this cache is configurable using a CatchupSupportingBlockingSubscriptionConfig but it defaults to 100. The purpose of the cache is to reduce the likely hood of duplicate events when switching from replay mode to continuous mode. Otherwise, there would be a chance that event written exactly when the switch from replay mode to continuous mode takes places, can be lost. To prevent this, the continuous mode subscription starts at a position before the last event read from the history. The purpose of the cache is thus to filter away events that are detected as duplicates during the switch. If the cache is too small, duplicate events will be sent to the continuous subscription. Typically, you want your application to be idempotent anyways and if so this shouldn’t be a problem.

CatchupSupportingBlockingSubscription can be configured to store the subscription position in the supplied storage (see example above) so that, if the application crashes during replay mode, it doesn’t need to start replaying from the beginning again. Note that if you don’t want subscription persistence during replay, you can disable this by doing new CatchupSupportingBlockingSubscriptionConfig(dontUseSubscriptionPositionStorage()).

Reactive Subscriptions

A “reactive subscription” is a subscription that uses non-blocking IO when reading events from the event store, i.e. reading changes from an EventStore will not block a thread. It uses concepts from reactive programming which is well-suited for working with streams of data. This is arguably a bit more complex for the typical Java developer, and you should consider using blocking subscriptions if high throughput, low CPU and memory-consumption is not critical.

All reactive subscriptions implements the org.occurrent.subscription.api.reactor.ReactorSubscription interface which uses components from project reactor. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscription.subscribe("mySubscriptionId").doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId").doOnNext(::println).subscribe()
The "subscribe" method returns an instance of "Flux<CloudEvent>".

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface ReactorSubscription {

    /**
     * Stream events from the event store as they arrive. Use this method if want to start streaming from a specific position.
     *
     * @return A Flux with cloud events which may also includes the SubscriptionPosition that can be used to resume the stream from the current position.
     */
    Flux<CloudEvent> subscribe(SubscriptionFilter filter, StartAt startAt);

    // Default methods 

}

It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a ReactorSubscription implementation that also implement the org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription interface. The PositionAwareReactorSubscription is an example of a ReactorSubscription that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.PositionAwareCloudEvent which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent) and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent). Note that PositionAwareCloudEvent is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareReactorSubscription, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Reactive Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscription.subscribe("mySubscriptionId", filter(type("GameEnded"))).doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId", filter(type("GameEnded")).doOnNext(::println).subscribe()

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(::println).subscribe()

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Reactive Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides two ways to specify the start position, either by using StartAt.now() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Reactive Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a reactive subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.reactor.ReactorSubscriptionPositionStorage acts as a uniform abstraction for this purpose. A ReactorSubscriptionPositionStorage is defined like this:

public interface ReactorSubscriptionPositionStorage {
    Mono<SubscriptionPosition> read(String subscriptionId);
    Mono<SubscriptionPosition> save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    Mono<Void> delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships one pre-defined reactive implementation (please contribute!):

1. SpringReactorSubscriptionPositionStorageForMongoDB
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor-position-storage</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor-position-storage:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor-position-storage" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor-position-storage', version='0.3.0') 
[org.occurrent/subscription-mongodb-spring-reactor-position-storage "0.3.0"]
'org.occurrent:subscription-mongodb-spring-reactor-position-storage:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor-position-storage" rev="0.3.0" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “reactive subscription API” which contains the ReactorSubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-reactor</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-api-reactor:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-api-reactor" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-api-reactor', version='0.3.0') 
[org.occurrent/subscription-api-reactor "0.3.0"]
'org.occurrent:subscription-api-reactor:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-api-reactor" rev="0.3.0" />

Reactive Subscription Implementations

These are the non-durable reactive subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription interface (since these types of subscriptions doesn’t need to be aware of the position). However you can do this anyway you like.

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.reactor.ReactorSubscriptionPositionStorage. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareReactorSubscription and a ReactorSubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a subscription position storage implementation, and store the position yourself when you find fit.

Reactive Subscription using Spring ReactiveMongoTemplate

An implementation that uses Spring’s ReactiveMongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor', version='0.3.0') 
[org.occurrent/subscription-mongodb-spring-reactor "0.3.0"]
'org.occurrent:subscription-mongodb-spring-reactor:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor" rev="0.3.0" />

Then create a new instance of SpringReactorSubscriptionForMongoDB and start subscribing:

ReactiveMongoTemplate reactiveMongoTemplate = ...
// Create the blocking subscription
BlockingSubscription subscriptions = new SpringReactorSubscriptionForMongoDB(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptions.subscribe("mySubscriptionId").flatMap(cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val reactiveMongoTemplate : ReactiveMongoTemplate = ... 
// Create the blocking subscription
val subscriptions = SpringReactorSubscriptionForMongoDB(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId").flatMap { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }.subscribe()

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
SpringReactorSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.spring.reactor" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the ReactiveMongoTemplate instance.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Automatic Subscription Position Persistence (Reactive)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that combines a PositionAwareReactorSubscription and a ReactorSubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to ReactorSubscriptionWithAutomaticPositionPersistenceConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new ReactorSubscriptionWithAutomaticPositionPersistenceConfig(3) that creates an instance of EveryN that stores the position for every third event.

To use it, first to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-reactor-automatic-position-persistence</artifactId>
    <version>0.3.0</version>
</dependency>
compile 'org.occurrent:subscription-util-reactor-automatic-position-persistence:0.3.0'
libraryDependencies += "org.occurrent" % "subscription-util-reactor-automatic-position-persistence" % "0.3.0"
@Grab(group='org.occurrent', module='subscription-util-reactor-automatic-position-persistence', version='0.3.0') 
[org.occurrent/subscription-util-reactor-automatic-position-persistence "0.3.0"]
'org.occurrent:subscription-util-reactor-automatic-position-persistence:jar:0.3.0'
<dependency org="org.occurrent" name="subscription-util-reactor-automatic-position-persistence" rev="0.3.0" />

Then we should instantiate a PositionAwareReactorSubscription, that subscribes to the events from the event store, and an instance of a ReactorSubscriptionPositionStorage, that stores the subscription position, and combine them to a ReactorSubscriptionWithAutomaticPositionPersistence:

// Create the non-durable blocking subscription instance 
PositionAwareReactorSubscription nonDurableSubscription = ...
// Create the storage
ReactorSubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription and the subscription position storage
ReactorSubscriptionWithAutomaticPositionPersistence durableSubscription = new ReactorSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);

// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscription : PositionAwareReactorSubscription = ...
// Create the storage
val storage : ReactorSubscriptionPositionStorage = ...

// Now combine the non-durable subscription and the subscription position storage
val durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)

// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent -> 
    doSomethingWithTheCloudEvent(cloudEvent) 
}.subscribe()

Blogs

Johan has created a couple of blog-posts on Occurrent on his blog:

  1. Occurrent - Event Sourcing for the JVM

Contact & Support

Would you like to contribute to Occurrent? That’s great! You should join the mailing-list or contribute on github. The mailing-list can also be used for support and discussions.

Credits

Thanks to Per Ökvist for discussions and ideas, and David Åse for letting me fork the awesome javalin website.