Fork me on GitHub

Documentation

The documentation on this page is always for the latest version of Occurrent, currently 0.1.1.

If you like Occurrent, please consider starring us on GitHub: Like Occurrent? Star us on GitHub:

Introduction

Occurrent is in an early stage so API's, and even the data model, are subject to change in the future.

Occurrent is an event sourcing library, or if you wish, a set of event sourcing utilities for the JVM, created by Johan Haleby. There are many options for doing event sourcing in Java already so why build another one? There are a few reasons for this besides the intrinsic joy of doing something yourself:

Concepts

Event Sourcing

Every system needs to store and update data somehow. Many times this is done by storing the current state of an entity in the database. For example, you might have an entity called Order stored in a order table in a relational database. Everytime something happens to the order, the table is updated with the new information and replacing the previous values. Event Sourcing is a technique that instead stores the changes, represented by events, that occurred for the entity. Events are facts, things that have happened, and they should never be updated. This means that not only can you derive the current state from the set of historic events, but you also know which steps that were involved to reach the current state.

EventStore

The event store is a place where you store events. Events are immutable pieces of data describing state changes for a particular stream. A stream is a collection of events that are related, typically but not limited to, a particular entity. For example a stream may include all events for a particular instance of a game or an order.

Occurrent provides an interface, EventStore, that allows to read and write events from the database. The EventStore interface is actually composed of various smaller interfaces since not all databases supports all aspects provided by the EventStore interface. Here’s an example that writes a cloud event to the event store and read it back:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Note that when reading the events, the EventStore won’t simply return a Stream of CloudEvent’s, instead it returns a wrapper called EventStream.

EventStream

The EventStream contains the CloudEvent’s for a stream and the version of the stream. The version can be used to guarantee that only one thread/process is allowed to write to the stream at the same time, i.e. optimistic locking. This can be achieved by including the version in a write condition.

Note that reading a stream that doesn’t exist (e.g. eventStore.read("non-existing-id") will return an instance of EventStream with an empty stream of events and 0 as version number. The reason for this is that you can use the same “application service” (a fancy word for a piece of code that loads events from the event store, applies them to the domain model and writes the new events returned to the event store) for both entity creation and subsequent use cases. For example consider this simple domain model:

public class WordGuessingGame {
	public static Stream<CloudEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<CloudEvent> guessWord(Stream<CloudEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package 
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<CloudEvent>, word : String) : Stream<CloudEvent> = ...
 }

Then we could write a generic application service that takes a higher-order function (Stream<CloudEvent>) -> Stream<CloudEvent>:

public class ApplicationService {

    private final EventStore eventStore;

    public ApplicationService(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    public void execute(String streamId, Function<Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);

        // Invoke the domain model  
        Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventStream.events());

        // Persist the new events  
        eventStore.write(streamId, eventStream.version(), newEvents);
    }
}
class ApplicationService constructor (val eventStore : EventStore) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        
         // Invoke the domain model 
        val newEvents = functionThatCallsDomainModel(eventStream.events())

        // Persist the new events
        eventStore.write(streamId, eventStream.version(), newEvents)
    }
}
Note that typically the domain model, WordGuessingGame in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in this example above for brevity.

You could then call the application service like this regardless of you’re starting a new game or not:

// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
String gameId = ...
String wordToGuess = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId, __ -> WordGuessingGame.startNewGame(gameId, wordToGuess));  

// Later a player guess a word:
String gameId = ...
String guess = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));
// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint. 
val gameId : String = ...
val wordToGuess : String = ...;

// Then we invoke the application service to start a game:
applicationService.execute(gameId) { 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}  

// Later a player guess a word:
val gameId : String = ...
val guess : String = ...;

// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));

Writing application services like this is both powerful and simple (once you get used to it). There’s less need for explicit commands and command handlers (the application service is a kind of command handler). You can also use other functional techniques such as partial application to make the code look, arguably, even nicer. It’s also easy to compose several calls to the domain model into one by using standard functional composition techniques. For example in this case you might consider both starting a game and let the player make her first guess from a single request to the REST API. No need to change the domain model to do this, just use function composition.

Write Condition

A “write condition” can be used to specify conditional writes to the event store. Typically, the purpose of this would be to achieve optimistic concurrent control (optimistic locking) of an event stream.

For example, image you have an Account to which you can deposit and withdraw money. A business rule says that it’s not allowed to have a negative balance on an account. Now imagine an account that is shared between two persons and contains 20 EUR. Person “A” wants to withdraw 15 EUR and person “B” wants to withdraw 10 EUR. If they try to do this, an error message should be presented to one of them since the account balance would be negative. But what happens if both persons try to withdraw the money at the same time? Let’s have a look:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw, and returns new events. 
// In this case, a "MoneyWasWithdrawn" event is returned,  since 15 EUR is OK to withdraw.     
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store  
eventStore.write("account1", events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // B

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events);
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A

// "withdraw" is a pure function in the Account domain model which takes a Stream
//  of all current events and the amount to withdraw. It returns a stream of 
// new events, in this case only a "MoneyWasWithdrawn" event,  since 15 EUR is OK to withdraw.     
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR))

// We write the new events to the event store  
eventStore.write("account1", events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1") // B

// Again we want to withdraw money, and the system will think this is OK, 
// since the Account thinks that 10 EUR will have a balance of 10 EUR after 
// the withdrawal.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store without any problems! 😱 
// But this shouldn't work since it would violate the business rule!   
eventStore.write("account1", events)
Note that typically the domain model, Account in this example, would not return CloudEvents but rather a stream or list of a custom data structure, domain events, that would then be converted to CloudEvent's. This is not shown in the example above for brevity, look at the command section for a more real-life example.

To avoid the problem above we want to make use of conditional writes. Let’s see how:

// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
long currentVersion = eventStream.version(); 

// Withdraw money
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events);

// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A 
long currentVersion = eventStream.version();

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events); 
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A
val currentVersion = eventStream.version() 

// Withdraw money
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR));

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.   
eventStore.write("account1", currentVersion, events)

// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1"); // A 
val currentVersion = eventStream.version()

// Again we want to withdraw money, and the system will think this is OK, 
// since event streams for A and B has not yet recorded that the balance is negative.   
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))

// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception. 
eventStore.write("account1", currentVersion, events) 

What you’ve seen above is a simple, but widely used, form of write condition. Actually, doing eventStore.write("streamId", version, events) is just a shortcut for:

eventStore.write("streamId", WriteCondition.streamVersionEq(version), events);
eventStore.write("streamId", WriteCondition.streamVersionEq(version), events)
WriteCondition can be imported from "org.occurrent.eventstore.api.WriteCondition".

But you can compose a more advanced write condition using a Condition:

eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events);
eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events)

where lt, ne and and is statically imported from org.occurrent.condition.Condition.

EventStore Queries

Since Occurrent builds on-top of existing databases it’s ok, given that you know what you’re doing*, to use the strengths of these databases. One such strength is that databases typically have good querying support. Occurrent exposes this with the EventStoreQueries interface that an EventStore implementation may implement to expose querying capabilities. For example:

OffsetDateTime lastTwoHours = OffsetDateTime.now().minusHours(2); 
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
Stream<CloudEvent> events = eventStore.query(time(lte(lastTwoHours)).and(subject("123")), SortBy.TIME_DESC);
val lastTwoHours = OffsetDateTime.now().minusHours(2);
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
val events : Stream<CloudEvent> = eventStore.query(time(lte(lastTwoHours)).and(subject("123")), SortBy.TIME_DESC)
*There's a trade-off when it's appropriate to query the database vs creating materialized views/projections and you should most likely create indexes to allow for fast queries.

The time and subject methods are statically imported from org.occurrent.filter.Filter and lte is statically imported from org.occurrent.condition.Condition.

EventStoreQueries is not bound to a particular stream, rather you can query any stream (or multiple streams at the same time). It also provides the ability to get an “all” stream:

// Return all events in an event store sorted by descending order
Stream<CloudEvent> events = eventStore.all(SortBy.TIME_DESC);
// Return all events in an event store sorted by descending order
val events : Stream<CloudEvent> = eventStore.all(SortBy.TIME_DESC)

The EventStoreQueries interface also supports skip and limit capabilities which allows for pagination:

// Skip 42, limit 1024
Stream<CloudEvent> events = eventStore.all(42, 1024);
// Skip 42, limit 1024
val events : Stream<CloudEvent> = eventStore.all(42, 1024)

To get started with an event store refer to Choosing An EventStore.

Subscriptions

A subscription is a way get notified when new events are written to an event store. Typically, a subscription will forward the event to another piece of infrastructure such as a message bus, or to create views from the events (such as projections, sagas, snapshots etc). There are two different kinds of API’s, the first one is a blocking API represented by the BlockingSubscription interface (in the org.occurrent:subscription-api-blocking module), and second one is a reactive API represented by the ReactorSubscription interface (in the org.occurrent:subscription-api-reactor module).

The blocking API is callback based, which is fine if you’re working with individual events (you can of course write a simple function that aggregates events into batches). If you want to work with streams of data, the ReactorSubscription is probably a better option since it’s using the Flux publisher from project reactor.

Note that it’s fine to use ReactorSubscription, even though the event store is implemented using the blocking api, and vice versa. If the datastore allows it, you can also run subscriptions in a different process than the processes reading and writing to the event store.

To get started with subscriptions refer to Using Subscriptions.

EventStore Operations

Occurrent event store implementations may optionally also implement the EventStoreOperations interface. It provides means to delete a specific event, or an entire event stream. For example:

// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId");
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource);
// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId")
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource)

These are probably operations that you want to use sparingly. Typically, you never want to remove any events, but there are some cases, such as GDPR or other regulations, that requires the deletion of an event or an entire event stream. You should be aware that there are other ways to solve this though. One way would be to encrypt personal data and throw away the key when the user no longer uses the service. Another would be to store personal data outside the event store.

Another feature provided by EventStoreOperations is the ability to update an event. Again, this is not something you normally want to do, but it can be useful for certain strategies of GDPR compliance. For example maybe you want to remove or update personal data in an event when a users unregisters from your service. Here’s an example:

eventStoreOperations.updateEvent("cloudEventId", cloudEventSource, cloudEvent -> {
    return CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build();
});
eventStoreOperations.updateEvent("cloudEventId", cloudEventSource) { cloudEvent -> 
    CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build()
})

Views

Occurrent doesn’t have any special components for creating views/projections. Instead, you simply create a subscription in which you can create and store the view as you find fit. But this doesn’t have to be difficult! Here’s a trivial example of a view that maintains the number of ended games. It does so by inceasing the “numberOfEndedGames” field in an (imaginary) database for each “GameEnded” event that is written to the event store:

// An imaginary database API
Database someDatabase = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game.   
subscription.subscribe("my-view", filter(type("GameEnded")), cloudEvent -> someDatabase.inc("numberOfEndedGames"));        
// An imaginary database API
val someDatabase : Database = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view" 
// and increase "numberOfEndedGames" for each ended game. 
subscription.subscribe("my-view", filter(type("GameEnded"))) {  
    someDatabase.inc("numberOfEndedGames")
}

Where filter is imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is imported from org.occurrent.condition.Condition.

While this is a trivial example it shouldn’t be difficult to create a view that is backed by a JPA entity in a relational database based on a subscription.

Commands

Occurrent doesn’t contain a built-in command bus. The reason for this is that I’m not convinced that it’s needed in a majority of cases. To send “commands” to another service (remotely) one could call a REST API or make an RPC invocation instead of using a proprietary command bus. One exception to this is if you need location transparency.

But what about internally? For example if a service exposes a REST API and upon receiving a request it publishes a command that’s somehow picked up and routed to a function in your domain model. It’s not uncommon to use a framework in which you define your domain model like this:

public class WordGuessingGame extends AggregateRoot {

    @AggregateId
    private String gameId;
    private String wordToGuess;

    @HandleCommand
    public void handle(StartNewGameCommand startNewGameCommand) {
        // Insert some validation and logic
        ... 
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessingGameWasStartedEvent(...));
    }
    
    @HandleCommand
    public void handle(GuessWordCommand guessWordCommand) {
        // Some validation and implementation ...
        ...	
        
        // Publish an event using the "publish" method from AggregateRoot
        publish(new WordGuessedEvent(...));
    }

    @HandleEvent
    public void handle(WordGuessingGameWasStartedEvent e) {
        this.gameId = e.getGameId();
        this.wordToGuess = e.getWordToGuess();    
    }        

    ... 
}
 class WordGuessingGame : AggregateRoot() {
 
     @AggregateId
     var gameId : String
     var wordToGuess : String
 
     @HandleCommand
     fun handle(startNewGameCommand : StartNewGameCommand) {
         // Insert some validation and logic
         ... 
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessingGameWasStartedEvent(...))
     }
     
     @HandleCommand
     fun handle(guessWordCommand : GuessWordCommand) {
         // Some validation and implementation ...
         ...	
         
         // Publish an event using the "publish" method from AggregateRoot
         publish(WordGuessedEvent(...))
     }
 
     @HandleEvent
     fun handle(e : WordGuessingGameWasStartedEvent) {
         gameId = e.getGameId()
         wordToGuess = e.getWordToGuess()    
     }        
 
     ... 
 }
This is a made-up example of an imaginary event sourcing framework, it's not how you're encouraged to implement a domain model using Occurrent.

Let’s look at a “command” and see what it typically looks like:

public class StartNewGameCommand {
    
    @AggregateId
    private String gameId;
    private String wordToGuess;
    
    public void setGameId(String gameId) {
        this.gameId = gameId;
    }
    
    public String getGameId() {
        return gameId;
    }

    public void setWordToGuess(String wordToGuess) {
        this.gameId = gameId;
    }
    
    public String getWordToGuess() {
        return wordToGuess;
    }
    
    // Equals/hashcode/tostring methods are excluded for breivty
}
data class StartNewGameCommand(@AggregateId var gameId: String, val wordToGuess : String)

Now that we have our WordGuessingGame implementation and a command we can dispatch it to a command bus:

commandbus.dispatch(new StartNewGameCommand("someGameId", "Secret word"));
commandbus.dispatch(StartNewGameCommand("someGameId", "Secret word"))

From a typical Java perspective one could argue that this is not too bad. But it does have a few things one could improve upon from a broader perspective:

  1. The WordGuessingGame is complecting several things that may be modelled separately. Data, state, behavior, command- and event routing and event publishing are all defined in the same model (the WordGuessingGame class). For small examples like this it arguably doesn’t matter, but if you have complex logic and a large system, it probably will in my experience. Keeping state and behavior separate allows for easier testing, referential transparency and function composition. It allows treating the state as a value which has many benefits.
  2. Commands are defined as explicit data structures (this is not necessarily a bad thing but it will add to your code base) when arguably they don’t have to.

So how would one dispatch commands in Occurrent? There’s actually nothing stopping you from implementation a simple command bus, create explicit commands, and dispatch them the way we did in the example above. Actually it would be relatively easy to implement the imaginary framework above using Occurrent components. But if you’ve recognized the problems described above you’re probably looking for a different approach. Here’s another way you can do it. First of all let’s refactor our domain model to pure functions, without any state or dependencies to Occurrent or any other library/framework.

public class WordGuessingGame {
	public static Stream<DomainEvent> startNewGame(String gameId, String wordToGuess) {	
		...
	}

	public static Stream<DomainEvent> guessWord(Stream<DomainEvent> eventStream, String word) {
		...
	}
}
// Note that the functions could might as well be placed directly in a package.
// You might also want to use a Kotlin Sequence or List instead of a Stream
 object WordGuessingGame {
    fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...	
 
    fun guessWord(eventStream : Stream<DomainEvent>, word : String) : Stream<DomainEvent> = ...
 }

If you define your behavior like this it’ll be really easy to test (and also to compose using normal function composition techniques). There are no side-effects (such as publishing events) which also allows for easier testing and local reasoning.

But where are our commands!? In this example we’ve decided to represent them as functions. I.e. the “command” is modeled as simple function, e.g. startNewGame! But wait, how can I dispatch commands to this function? Just create or copy a generic ApplicationService class like the one below if you’re using an object-oriented approach:

public class ApplicationService {

    private final EventStore eventStore;
    private final Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent;
    private final Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent;

    public ApplicationService(EventStore eventStore, 
                              Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent, 
                              Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent) {
        this.eventStore = eventStore;
        this.convertCloudEventToDomainEvent = convertCloudEventToDomainEvent;
        this.convertDomainEventToCloudEvent = convertDomainEventToCloudEvent;
    }

    public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);
        // Convert the cloud events into domain events
        Stream<DomainEvent> domainEventsInStream = eventStream.events().map(convertCloudEventToDomainEvent);

        // Call a pure function from the domain model which returns a Stream of domain events  
        Stream<DomainEvent> newDomainEvents = functionThatCallsDomainModel.apply(domainEventsInStream);

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent));
    }
}
class ApplicationService constructor (val eventStore : EventStore, 
                                      val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent, 
                                      val convertDomainEventToCloudEvent : (DomainEvent) -> CloudEvent) {

    fun execute(streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
        // Read all events from the event store for a particular stream
        val  eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
        // Convert the cloud events into domain events
        val domainEventsInStream : Stream<DomainEvent> = eventStream.events().map(convertCloudEventToDomainEvent)

        // Call a pure function from the domain model which returns a Stream of domain events
        val newDomainEvents = functionThatCallsDomainModel(domainEventsInStream)

        // Convert domain events to cloud events and write them to the event store
        eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent))
    }
}
Why is this "utility" not included in Occurrent? Maybe it will in the future, but one reason is that you might want to do small tweaks to this implementation. When using Spring, you might want to add a "@Transactional" annotation, or if you're using Kotlin you might want to take a higher-order function that returns a "Sequence<DomainEvent>" instead of "Stream<DomainEvent>" etc etc. The reasoning is that copying and pasting this peice of code into your application will not be difficult, and then you're in full control! It's also really important to point out the `EventStore` will either write all events atomically or all events will fail! You will never have partial writes.

and then use the ApplicationService like this:

// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
EventStore eventStore = ..
ApplicationService applicationService = new ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.execute(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = ..
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent  = ..
val eventStore : EventStore = ..
val applicationService = ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);

// Now in your REST API use the application service:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService.execute(gameId) { events -> 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}

If you’re using a more functional approach you can create a function like this that represents an application service:


public class ApplicationService {

    public static void execute(EventStore eventStore, String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
        // Read all events from the event store for a particular stream
        EventStream<CloudEvent> eventStream = eventStore.read(streamId);
        Stream<CloudEvent> eventsInStream = eventStream.events();

        // Call a pure function from the domain model which returns a Stream of events  
        Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventsInStream);

        // Write the new events to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents);
    }
}
fun execute(eventStore : EventStore, streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
    // Read all events from the event store for a particular stream
    val eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
    val domainEventsInStream : Stream<CloudEvent> = eventStream.events()

    // Call a pure function from the domain model which returns a Stream of events
    val newEvents = functionThatCallsDomainModel(domainEventsInStream)

    // Write the new events to the event store 
    eventStore.write(streamId, eventStream.version(), newEvents)
}

and when you have instantiated an EventStore and created functions that converts a CloudEvent to a DomainEvent and vice versa:

// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
// The event store
EventStore eventStore = ..
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = 
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent  = ..
val eventStore : EventStore = ..

then you can compose these functions into a generic “application service function”:

// This example is using an imaginary FP library for Java that has methods such as "partially". 
// You might want to look into "Vavr" or "Functional Java" which includes functions like this  
BiConsumer<String, Function<Stream<DomainEvent>, Stream<DomainEvent>> applicationService = (streamId, domainFn) -> 
            partially(ApplicationService::execute, evenStore, streamId)
            .andThen( domainEventsInStream -> domainEventsInStream.map(convertCloudEventToDomainEvent))
            .andThen(domainFn)
            .andThen( newDomainEvents -> newDomainEvents.map(convertDomainEventToCloudEvent));       
// This example is using an imaginary FP library for Kotlin that has methods such as "partially" and "andThen". 
// You might want to look into the Kotlin "arrow" library which include these functions.
fun applicationService(String, (Stream<DomainEvent>) -> Stream<DomainEvent>) : Unit =  
         partially(ApplicationService::execute, evenStore, streamId)
         .andThen { domainEventsInStream -> domainEventsInStream.map(convertCloudEventToDomainEvent)) }
         .andThen(domainFn)
         .andThen { newDomainEvents -> newDomainEvents.map(convertDomainEventToCloudEvent) }

and then use the “application service function” like this:

// Now in your REST API use the application service function:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.consume(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// Now in your REST API use the application service function:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService(gameId) { events -> 
    WordGuessingGame.startNewGame(gameId, wordToGuess)
}

Sagas

A “saga” can be used to represent and coordinate a long-lived business transaction/process (where “long-lived” is kind of arbitrary). This is an advanced subject and you should try to avoid sagas if there are other means available to solve the problem (for example use policies if they are sufficient). Occurrent doesn’t provide or enforce any specific Saga implementation. But since Occurrent is a library you can hook in already existing solutions, for example:

The way to integrate Occurrent with any of these libraries/frameworks/patterns is to create a subscription that forwards the events written to the event store to the preferred library/framework/view.

Policy

A policy (or “reaction”) can be used to deal with workflows such as “whenever this happens, do that”. For example, whenever a game is won, send an email to the winner. For simple workflows like this there’s typically no need for a full-fledged saga. In Occurrent, you can create simple policies by creating a subscription. Let’s consider the example above:

public class WhenGameWonThenSendEmailToWinnerPolicy {

    private final BlockingSubscription<CloudEvent> subscription;
    private final EmailClient emailClient;
    private final Players players;
    
    public WhenGameWonThenSendEmailToWinnerPolicy(BlockingSubscription<CloudEvent> subscription, EmailClient emailClient, Players players) {
        this.subscription = subscription;
        this.emailClient = emailClient;
        this.players = players;
    }
    
    @PostConstruct
    public void whenGameWonThenSendEmailToWinner() {
        subscription.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")), cloudEvent -> {
            String playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()));
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!");
        }          
    }
}
class WhenGameWonThenSendEmailToWinnerPolicy(val subscription : BlockingSubscription<CloudEvent>, 
                                             val emailClient : EmailClient, val players : Players) {

    @PostConstruct
    fun whenGameWonThenSendEmailToWinner() = 
        subscription.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")) { cloudEvent -> 
            val playerEmailAddress =  players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()))
            emailClient.sendEmail(playerEmailAddress, "You won, yaay!")
        }          
}

You could also create a generic policy that simply forwards all events to another piece of infrastructure. For example, you may wish to forward all events to rabbitmq (by publishing them) or Spring’s event infrastructure, and then create policies that subscribes to events from these systems instead. There’s an example in the github repository that shows an example of how one can achieve this.

You may also want to look into the “todo-list” pattern described in the automation section on the in the event modeling website.

Snapshots

Using snapshots is an advanced technique and it shouldn't be used unless it's really necessary.

Snapshotting is an optimization technique that can be applied if it takes too long to derive the current state from an event stream for each command. There are several ways to do this and Occurrent doesn’t enforce any particular strategy. One strategy is to use so called “snapshot events” (special events that contains a pre-calculated view of the state of an event stream at a particular time) and another technique is to write snapshots to another datastore than the event store.

The application service need to be modified to first load the up the snapshot and then load events that have not yet been materialized in the snapshot (if any).

Synchronous Snapshots

With Occurrent you can trade-off write speed for understandability. For example, let’s say that you want to update the snapshot on every write and it should be consistent with the writes to the event store. One way to do this is to use Spring’s transactional support:

public class ApplicationService {

    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    
    public ApplicationService(EventStore eventStore,  SnapshotRepository snapshotRepository) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
    }
    
    @Transactional
    public void execute(String streamId, BiFunction<Snapshot, Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
        // Read snapshot from a the snapshot repsitory 
        Snapshot snapshot = snapshotRepsitory.findByStreamId(streamId);
        long snapshotVersion = snapshot.version();        
    
        // Read all events for "streamId" from snapshotVersion  
        EventStream<CloudEvent> eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE);

        // Call a pure function from the domain model which returns a Stream of new events  
        List<CloudEvent> newEvents = functionThatCallsDomainModel.apply(snapshot.state(), eventStream).collect(Collectors.toList());

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream());
        
        // Update the snapshot
        Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
        snapshotRepsitory.save(updatedSnapshot);
    }
}
class ApplicationService(val eventStore : EventStore, val snapshotRepository : SnapshotRepository) {
    
    @Transactional
    fun execute(String streamId, functionThatCallsDomainModel : (Snapshot, Stream<CloudEvent>) -> Stream<CloudEvent>) {
        // Read snapshot from a the snapshot repsitory 
        val snapshot : Snapshot = snapshotRepsitory.findByStreamId(streamId)
        long snapshotVersion = snapshot.streamVersion()        
    
        // Read all events for "streamId" from snapshotVersion  
        val eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE)

        // Call a pure function from the domain model which returns a Stream of new events  
        val newEvents = functionThatCallsDomainModel(snapshot.state(), eventStream).collect(Collectors.toList())

        // Convert domain events to cloud events and write them to the event store  
        eventStore.write(streamId, eventStream.version(), newEvents.stream())
        
        // Update the snapshot
        val updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version())
        snapshotRepsitory.save(updatedSnapshot)
    }
}
This is a somewhat simplified example but the idea is hopefully made clear.

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (eventStream.version() - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Asynchronous Snapshots

As an alternative to synchronous and fully-consistent snapshots, you can update snapshots asynchronously. You do this by creating a subscription that updates the snapshot. For example:

public class UpdateSnapshotWhenNewEventsAreWrittenToEventStore {

    private final BlockingSubscription<CloudEvent> subscription;
    private final SnapshotRepository snapshotRepository;
    
    public UpdateSnapshotWhenNewEventsAreWrittenToEventStore(BlockingSubscription<CloudEvent> subscription, SnapshotRepository snapshotRepository) {
        this.subscription = subscription;
        this.snapshotRepository = snapshotRepository;
    }
    
    @PostConstruct
    public void updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", cloudEvent -> {
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent);
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent);
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId);        
            snapshot.updateFrom(cloudEvent, streamVersion);
            snapshotRepository.save(snapshot);
        }          
    }
}
class UpdateSnapshotWhenNewEventsAreWrittenToEventStore(val subscription : BlockingSubscription<CloudEvent>, 
                                                        val snapshotRepository : SnapshotRepository) {

    @PostConstruct
    fun updateSnapshotWhenNewEventsAreWrittenToEventStore() {
        subscription.subscribe("updateSnapshots", { cloudEvent -> 
            String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent)
            long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent)
            
            Snapshot snapshot = snapshotRepository.findByStreamId(streamId)        
            snapshot.updateFrom(cloudEvent, streamVersion)
            snapshotRepository.save(snapshot)
        }          
    }
}

It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:

if (streamVersion - snapshot.version() >= 3) {
    Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
    snapshotRepsitory.save(updatedSnapshot);
}

Closing the Books

This is a pattern that can be applied instead of updating snapshots for every n event. The idea is to try to keep event streams short and instead create snapshots periodically. For example, once every month we run a job that creates snapshots for certain event streams. This is especially well suited for problem domains where “closing the books” is a concept in the domain (such as accounting).

Getting started

Getting started with Occurrent involves these steps:

It's recommended to read up on CloudEvent's and its specification so that you're familiar with the structure and schema of a CloudEvent.
  1. Choose an underlying datastore for an event store. Luckily there are only two choices at the moment, MongoDB and an in-memory implementation. Hopefully this will be a more difficult decision in the future :)
  2. Once a datastore has been decided it’s time to choose an EventStore implementation for this datastore since there may be more than one.
  3. If you need subscriptions (i.e. the ability to subscribe to changes from an EventStore) then you need to pick a library that implements this for the datastore that you’ve chosen. Again, there may be several implementations to choose from.
  4. If a subscriber needs to be able to continue from where it left off on application restart, it’s worth looking into a so called “position storage” library. These libraries provide means to automatically (or selectively) store the position for a subscriber to a datastore. Note that the datastore that stores this position can be a different datastore than the one used as EventStore. For example, you can use MongoDB as EventStore but store subscription positions in Redis.

Choosing An EventStore

There are currently two different datastores to choose from, MongoDB and In-Memory.

MongoDB

Uses MongoDB, version 4.2 or above, as the underlying datastore for the CloudEvents. All implementations use transactions to guarantee consistent writes (see WriteCondition). Each EventStore will automatically create a few indexes on startup to allow for fast consistent writes, optimistic concurrency control and to avoid duplicated events. These indexes can also be used in queries against the EventStore (see EventStoreQueries).

There are three different MongoDB EventStore implementations to choose from:

MongoDB Schema

All MongoDB EventStore implementations tries to stay as close as possible to the CloudEvent’s specification even in the persitence layer. Occurrent, by default, automatically adds a custom “Occurrent extension” to each cloud event that is written to an EventStore. The Occurrent CloudEvent Extension consists of these attributes:


Attribute Name Type Description
streamId         String         An id the uniquely identifies a particular event stream.
It’s used to determine which events belong to which stream.
streamVersion Long The id of the stream version for a particular event.
It’s used for optimistic concurrency control.

A json schema describing a complete Occurrent CloudEvent, as it will be persisted to a MongoDB collection, can be found here (a “raw” cloud event json schema can be found here for comparison).

Note that MongoDB will automatically add an _id field (which is not used by Occurrent). The reason why the CloudEvent id attribute is not stored as _id in MongoDB is that the id of a CloudEvent is not globally unique! The combination of id and source is a globally unique CloudEvent. Note also that _id will not be included when the CloudEvent is read from an EventStore.

Here’s an example of what you can expect to see the “events” collection when storing events in an EventStore backed by MongoDB (given that TimeRepresentation is set to DATE):

{
	"_id : ObjectId("5f4112a348b8da5305e41f57"),
	"specversion" : "1.0",
	"id" : "bdb8481f-9e8e-443b-80a4-5ef787f0f227",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "NumberGuessingGameWasStarted",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:11.712Z"),
	"data" : {
		"secretNumberToGuess" : 8,
		"startedBy" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf",
		"maxNumberOfGuesses" : 5
	},
	"streamId" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamVersion" : NumberLong(1)
}
{
	"_id" : ObjectId("5f4112a548b8da5305e41f58"),
	"specversion" : "1.0",
	"id" : "c1bfc3a5-1716-43ae-88a6-297189b1b5c7",
	"source" : "urn:occurrent:domain:numberguessinggame",
	"type" : "PlayerGuessedANumberThatWasTooSmall",
	"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"time" : ISODate("2020-08-22T14:42:13.336Z"),
	"data" : {
		"guessedNumber" : 1,
		"playerId" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf"
	},
	"streamId" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
	"streamVersion" : NumberLong(2)
}

MongoDB Time Representation

The CloudEvents specification says that the time attribute, if present, must adhere to the RFC 3339 specification. To accommodate this in MongoDB, the time attribute must be persisted as a String. This by itself is not a problem, a problem only arise if you want to make time-based queries on the events persisted to a MongoDB-backed EventStore (using the EventStoreQueries interface). This is, quite obviously, because time-based queries on String’s are suboptimal (to say the least) and may lead to surprising results. What we would like to do is to persist the time attribute as a Date in MongoDB, but MongoDB internally represents a Date with only millisecond resolution (see here) and then the CloudEvent cannot be compliant with the RFC 3339 specification in all circumstances.

Because of the reasons described above, users of a MongoDB-backed EventStore implementation, must decide how the time attribute is to be represented in MongoDB when instantiating an EventStore implementation. This is done by passing a value from the org.occurrent.mongodb.timerepresentation.TimeRepresentation enum to an EventStoreConfig object that is then passed to the EventStore implementation. TimeRepresentation has these values:

Value Description
RFC_3339_STRING     Persist the time attribute as an RFC 3339 string. This string is able to represent both nanoseconds and a timezone so this is recommended for apps that need to store this information or if you are uncertain of whether this is required in the future.

DATE

Persist the time attribute as a MongoDB Date. The benefit of using this approach is that you can do range queries etc on the “time” field on the cloud event. This can be really useful for certain types on analytics or projections (such as show the 10 latest number of started games) without writing any custom code.

Note that if you choose to go with RFC_3339_STRING you always have the option of adding a custom attribute, named for example “date”, in which you represent the “time” attribute as a Date when writing the events to an EventStore. This way you have the ability to use the “time” attribute to re-construct the CloudEvent time attribute exactly as well as the ability to do custom time-queries on the “date” attribute. However, you cannot use the methods involving time-based queries when using the EventStoreQueries interface.

Important: There’s yet another option! If you don’t need nanotime precision (i.e you’re fine with millisecond precision) and you’re OK with always representing the “time” attribute in UTC, then you can use TimeRepresentation.DATE without loss of precision! This is why, if DATE is configured for the EventStore, Occurrent will refuse to store a CloudEvent that specifies nanotime and is not defined in UTC (so that there won’t be any surprises). I.e. using DATE and then doing this will throw an IllegalArgumentException:

var cloudEvent = new CloudEventBuilder().time(OffsetDateTime.now()). .. .build();

// Will throw exception since OffsetDateTime.now() will include nanoseconds by default in Java 9+
eventStore.write(Stream.of(cloudEvent));

Instead, you need to remove nano seconds do like this explicitly:

// Remove millis and make sure to use UTC as timezone                                          
var now = OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS).withOffsetSameInstant(ZoneOffset.UTC);
var cloudEvent = new CloudEventBuilder().time(now). .. .build();

// Now you can write the cloud event
eventStore.write(Stream.of(cloudEvent));

For more thoughts on this, refer to the architecture decision record on time representation in MongoDB.

MongoDB Indexes

Each MongoDB EventStore implementation creates a few indexes for the “events collection” the first time they’re instantiated. These are:

Name Properties Description
streamId ascending An index for the streamId property. Allows for fast reads of all events in a particular stream.
id + source ascending id,
descending source,  
unique

Compound index of id and source to comply with the specification that the id+source combination must be unique.
streamId + streamVersion   ascending streamId,
descending streamVersion,
unique
Compound index of streamId and streamVersion (Occurrent CloudEvent extension) used for fast retrieval of the latest cloud event in a stream.

To allow for fast queries, for example when using EventStoreQueries, it’s recommended to create additional indexes tailored to the querying behavior of your application. See MongoDB indexes for more information on how to do this. If you have many adhoc queries it’s also worth checking out wildcard indexes which is a new feature in MongoDB 4.2. These allow you to create indexes that allow for arbitrary queries on e.g. the data attribute of a cloud event (if data is stored in json/bson format).

MongoDB EventStore Implementations

There are three different MongoDB EventStore implementations to choose from:

EventStore with MongoDB Native Driver

What is it?

An EventStore implementation that uses the “native” Java MongoDB synchronous driver (see website) to read and write CloudEvent’s to MongoDB.

When to use?

Use when you don’t need Spring support and want to use MongoDB as the underlying datastore.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-native</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-native:0.1.1'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-native" % "0.1.1"
@Grab(group='org.occurrent', module='eventstore-mongodb-native', version='0.1.1') 
[org.occurrent/eventstore-mongodb-native "0.1.1"]
'org.occurrent:eventstore-mongodb-native:jar:0.1.1'
<dependency org="org.occurrent" name="eventstore-mongodb-native" rev="0.1.1" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.nativedriver.MongoEventStore. It takes four arguments, a MongoClient, the “database” and “event collection “that the EventStore will use to store events as well as an org.occurrent.eventstore.mongodb.nativedriver.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
EventStoreConfig config = new EventStoreConfig(TimeRepresentation.RFC_3339_STRING);
String mongoDatabase = "database";
String mongoEventCollection = "events";

MongoEventStore eventStore = new MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
val config = EventStoreConfig(TimeRepresentation.RFC_3339_STRING)
val mongoDatabase = "database"
val mongoEventCollection = "events"

val eventStore = MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using MongoEventStore. It also generates integration events and publishes these to RabbitMQ.

EventStore with Spring MongoTemplate (Blocking)

What is it?

An implementation that uses Spring’s MongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and you don’t need reactive support then this is a good choice. You can make use of the @Transactional annotation to write events and views in the same transaction (but make sure you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-blocking</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-blocking:0.1.1'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-blocking" % "0.1.1"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-blocking', version='0.1.1') 
[org.occurrent/eventstore-mongodb-spring-blocking "0.1.1"]
'org.occurrent:eventstore-mongodb-spring-blocking:jar:0.1.1'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-blocking" rev="0.1.1" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringBlockingMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

SpringBlockingMongoEventStore eventStore = new SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Examples

Name Description
Number Guessing Game A simple game implemented using a pure domain model and stores events in MongoDB using SpringBlockingMongoEventStore and Spring Boot. It also generates integration events and publishes these to RabbitMQ.
Subscription View An example showing how to create a subscription that listens to certain events stored in the EventStore and updates a view/projection from these events.
Transactional View An example showing how to combine writing events to the SpringBlockingMongoEventStore and update a view transactionally using the @Transactional annotation.
Custom Aggregation View Example demonstrating that you can query the SpringBlockingMongoEventStore using custom MongoDB aggregations.

EventStore with Spring ReactiveMongoTemplate (Reactive)

What is it?

An implementation that uses Spring’s ReactiveMongoTemplate to read and write events to/from MongoDB.

When to use?

If you’re already using Spring and want to use the reactive driver (project reactor) then this is a good choice. It uses the ReactiveMongoTemplate to write events to MongoDB. You can make use of the @Transactional annotation to write events and views in the same tx (but make sure that you understand what you’re going before attempting this).

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-mongodb-spring-reactor</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-reactor:0.1.1'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-reactor" % "0.1.1"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-reactor', version='0.1.1') 
[org.occurrent/eventstore-mongodb-spring-reactor "0.1.1"]
'org.occurrent:eventstore-mongodb-spring-reactor:jar:0.1.1'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-reactor" rev="0.1.1" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringBlockingMongoEventStore. It takes two arguments, a MongoTemplate and an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig.

For example:

MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));

MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
                                                         // The collection where all events will be stored 
                                                        .eventStoreCollectionName("events")
                                                        .transactionConfig(mongoTransactionManager)
                                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!! 
                                                        .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                                        .build();

SpringBlockingMongoEventStore eventStore = new SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))

val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
                                        // The collection where all events will be stored
                                       .eventStoreCollectionName("events")
                                       .transactionConfig(mongoTransactionManager)
                                        // How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
                                       .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                                       .build()

val eventStore = SpringBlockingMongoEventStore(mongoTemplate, eventStoreConfig)

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
Mono<Void> mono = eventStore.write("streamId", Flux.just(event));

// Read
Mono<EventStream<CloudEvent>> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
val mono = eventStore.write("streamId", Flux.just(event))

// Read
val eventStream : Mono<EventStream<CloudEvent>> = eventStore.read("streamId")

Examples

Name Description
Custom Aggregation View Example demonstrating that you can query the SpringBlockingMongoEventStore using custom MongoDB aggregations.

In-Memory EventStore

What is it?

A simple in-memory implementation of the EventStore interface.

When to use?

Mainly for testing purposes or for integration tests that doesn’t require a durable event store.

Dependencies

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>eventstore-inmemory</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:eventstore-inmemory:0.1.1'
libraryDependencies += "org.occurrent" % "eventstore-inmemory" % "0.1.1"
@Grab(group='org.occurrent', module='eventstore-inmemory', version='0.1.1') 
[org.occurrent/eventstore-inmemory "0.1.1"]
'org.occurrent:eventstore-inmemory:jar:0.1.1'
<dependency org="org.occurrent" name="eventstore-inmemory" rev="0.1.1" />

Getting Started

Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.inmemory.InMemoryEventStore. For example:

InMemoryEventStore eventStore = new InMemoryEventStore();
val eventStore = InMemoryEventStore()

Now you can start reading and writing events to the EventStore:

CloudEvent event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event));

// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
                    .withId("eventId")
                    .withSource(URI.create("urn:mydomain"))
                    .withType("HelloWorld")
                    .withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
                    .withSubject("subject")
                    .withDataContentType("application/json")
                    .withData("{ \"message\" : \"hello\" }")
                    .build();

// Write                    
eventStore.write("streamId", Stream.of(event))

// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")

Note that InMemoryEventStore doesn’t support EventStoreQueries.

Using Subscriptions

Before you start using subscriptions you should read up on what they are here.

There a two different kinds of subscriptions, blocking subscriptions and reactive subscriptions. For blocking subscription implementations see here and for reactive subscription implementations see here.

Blocking Subscriptions

A “blocking subscription” is a subscription that uses the normal Java threading mechanism for IO operations, i.e. reading changes from an EventStore will block the thread. This is arguably the easiest and most familiar way to use subscriptions for the typical Java developer, and it’s probably good-enough for most scenarios. If high throughput, low CPU and memory-consumption is critical then consider using reactive subscription instead. Reactive subscriptions are also better suited if you want to work with streaming data.

All blocking subscriptions implements the org.occurrent.subscription.api.blocking.BlockingSubscription interface. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscription.subscribe("mySubscriptionId", System.out::println);
subscription.subscribe("mySubscriptionId", ::println)

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface BlockingSubscription<T extends CloudEvent> {
    /**
     * Start listening to cloud events persisted to the event store using the supplied start position and <code>filter</code>.
     *
     * @param subscriptionId  The id of the subscription, must be unique!
     * @param filter          The filter used to limit which events that are of interest from the EventStore.
     * @param startAtSupplier A supplier that returns the start position to start the subscription from.
     *                        This is a useful alternative to just passing a fixed "StartAt" value if the stream is broken and re-subscribed to.
     *                        In these cases, streams should be restarted from the latest persisted position and not the start position as it
     *                        were when the application was first started.
     * @param action          This action will be invoked for each cloud event that is stored in the EventStore.
     */
    Subscription subscribe(String subscriptionId, SubscriptionFilter filter, Supplier<StartAt> startAtSupplier, Consumer<T> action);

    // Default methods 

}

The type <T>, define the type of the CloudEvent that the subscription produce. It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a BlockingSubscription implementation that also implement the org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription interface. The PositionAwareBlockingSubscription is an example of a BlockingSubscription that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.CloudEventWithSubscriptionPosition which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. Note that CloudEventWithSubscriptionPosition is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareBlockingSubscription, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Blocking Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscription.subscribe("mySubscriptionId", filter(type("GameEnded")), System.out::println);
subscription.subscribe("mySubscriptionId", filter(type("GameEnded")), ::println)

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), System.out::println);
subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), ::println)

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Blocking Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides two ways to specify the start position, either by using StartAt.now() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Blocking Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a blocking subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.blocking.BlockingSubscriptionPositionStorage acts as a uniform abstraction for this purpose. A BlockingSubscriptionPositionStorage is defined like this:

public interface BlockingSubscriptionPositionStorage {
    SubscriptionPosition read(String subscriptionId);
    SubscriptionPosition save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    void delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships with three pre-defined implementations:

1. BlockingSubscriptionPositionStorageForMongoDB
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking-position-storage</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-position-storage:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-position-storage" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-position-storage', version='0.1.1') 
[org.occurrent/subscription-mongodb-native-blocking-position-storage "0.1.1"]
'org.occurrent:subscription-mongodb-native-blocking-position-storage:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-position-storage" rev="0.1.1" />

2. SpringBlockingSubscriptionPositionStorageForMongoDB
Uses the Spring MongoTemplate to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking-position-storage</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-position-storage:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-position-storage" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-position-storage', version='0.1.1') 
[org.occurrent/subscription-mongodb-spring-blocking-position-storage "0.1.1"]
'org.occurrent:subscription-mongodb-spring-blocking-position-storage:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-position-storage" rev="0.1.1" />

3. SpringBlockingSubscriptionPositionStorageForRedis
Uses the Spring RedisTemplate to store SubscriptionPosition’s in Redis.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-redis-spring-blocking-position-storage</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-redis-spring-blocking-position-storage:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-redis-spring-blocking-position-storage" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-redis-spring-blocking-position-storage', version='0.1.1') 
[org.occurrent/subscription-redis-spring-blocking-position-storage "0.1.1"]
'org.occurrent:subscription-redis-spring-blocking-position-storage:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-redis-spring-blocking-position-storage" rev="0.1.1" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “blocking subscription API” which contains the BlockingSubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-blocking</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-api-blocking:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-api-blocking" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-api-blocking', version='0.1.1') 
[org.occurrent/subscription-api-blocking "0.1.1"]
'org.occurrent:subscription-api-blocking:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-api-blocking" rev="0.1.1" />

Blocking Subscription Implementations

These are the non-durable blocking subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription interface (since these types of subscriptions doesn’t need to be aware of the position).

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.blocking.PositionAwareBlockingSubscription. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareBlockingSubscription and a BlockingSubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a subscription position storage implementation, and store the position yourself when you find fit.

Blocking Subscription using the “Native” Java MongoDB Driver

Uses the vanilla Java MongoDB synchronous driver (no Spring dependency is required).

To get started first include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-native-blocking</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking', version='0.1.1') 
[org.occurrent/subscription-mongodb-native-blocking "0.1.1"]
'org.occurrent:subscription-mongodb-native-blocking:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking" rev="0.1.1" />

Then create a new instance of BlockingSubscriptionForMongoDB and start subscribing:

MongoTemplate mongoTemplate = mongoClient.getDatabase("some-database");
// Create the blocking subscription
BlockingSubscription<CloudEvent> subscriptions = new BlockingSubscriptionForMongoDB(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.fixed(200));

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val database = mongoClient.getDatabase("some-database")
// Create the blocking subscription
val subscriptions = BlockingSubscriptionForMongoDB(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.fixed(200))

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
BlockingSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.nativedriver.blocking" package.

There are a few things to note here that needs explaining. First we have the TimeRepresentation.DATE that is passed as the third constructor argument which you can read more about here. Secondly we have the Executors.newCachedThreadPool(). A thread will be created from this executor for each call to “subscribe” (i.e. for each subscription). Make sure that you have enough threads to cover all your subscriptions or the “BlockingSubscription” may hang. Last we have the RetryStrategy which defines what should happen if there’s e.g. a connection issue during the life-time of a subscription or if subscription fails to process a cloud event (i.e. the action throws an exception). These retry strategies are available:

Name Description
none Don’t retry! Instead the subscription will fail fast and not continue if there’s an error.
fixed Retry operation after a fixed number of milliseconds or Duration.
backoff     Retry after with exponential backoff if an action throws an exception.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Blocking Subscription using Spring MongoTemplate

An implementation that uses Spring’s MongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-blocking</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking', version='0.1.1') 
[org.occurrent/subscription-mongodb-spring-blocking "0.1.1"]
'org.occurrent:subscription-mongodb-spring-blocking:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking" rev="0.1.1" />

Then create a new instance of SpringBlockingSubscriptionForMongoDB and start subscribing:

MongoTemplate mongoTemplate = ...
// Create the blocking subscription
BlockingSubscription<CloudEvent> subscriptions = new SpringBlockingSubscriptionForMongoDB(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptions.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val mongoTemplate : MongoTemplate = ... 
// Create the blocking subscription
val subscriptions = SpringBlockingSubscriptionForMongoDB(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
SpringBlockingSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.spring.blocking" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the MongoTemplate instance.

When it comes to retries, if the “action” fails (i.e. if the higher-order function you provide when calling subscribe throws an exception), Occurrent doesn’t provide retries out of the box. This is most likely something you want to add, and since you’re using Spring you probably want to look into Spring Retry. For example consider that you have a simple Spring bean that writes each cloud event to a repository:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

And you want to subscribe to all “GameStarted” events and write them to the repository:

WriteToRepository writeToRepository = ...
subscriptions.subscribe("gameStartedLog", writeToRepository::write);

But if the connection to someRepository is flaky you can add Spring Retry so allow for retry with exponential backoff:

@Component
public class WriteToRepository {

	private final SomeRepository someRepository;

	public WriteToRepository(SomeRepository someRepository) {
		this.someRepository = someRepository;
	}

    @Retryable(backoff = @Backoff(delay = 200, multiplier = 2, maxDelay = 30000))
	public void write(CloudEvent cloudEvent) {
		someRepository.persist(cloudEvent);
	}
}

Don’t forget to add @EnableRetry in to your Spring Boot application as well.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Automatic Subscription Position Persistence (Blocking)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that implements BlockingSubscription and combines a PositionAwareBlockingSubscription and a BlockingSubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to BlockingSubscriptionWithAutomaticPositionPersistenceConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new BlockingSubscriptionWithAutomaticPositionPersistenceConfig(3) that creates an instance of EveryN that stores the position for every third event.

If you want full control, it’s recommended to pick a subscription position storage implementation, and store the position yourself using its API.

To use it, first we need to add the dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-blocking-automatic-position-persistence</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-util-blocking-automatic-position-persistence:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-util-blocking-automatic-position-persistence" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-util-blocking-automatic-position-persistence', version='0.1.1') 
[org.occurrent/subscription-util-blocking-automatic-position-persistence "0.1.1"]
'org.occurrent:subscription-util-blocking-automatic-position-persistence:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-util-blocking-automatic-position-persistence" rev="0.1.1" />

Then we should instantiate a PositionAwareBlockingSubscription, that subscribes to the events from the event store, and an instance of a BlockingSubscriptionPositionStorage, that stores the subscription position, and combine them to a BlockingSubscriptionWithAutomaticPositionPersistence:

// Create the non-durable blocking subscription instance 
PositionAwareBlockingSubscription nonDurableSubscription = ...
// Create the storage
BlockingSubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription and the subscription position storage
BlockingSubscription<CloudEvent> durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);

// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscription : PositionAwareBlockingSubscription = ...
// Create the storage
val storage : BlockingSubscriptionPositionStorage = ...

// Now combine the non-durable subscription and the subscription position storage
val durableSubscription : BlockingSubscription<CloudEvent> = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)

// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) };

Catch-up Subscription (Blocking)

When starting a new subscription it’s often useful to first replay historic events to get up-to-speed and then subscribing to new events as the arrive. A catch-up subscription allows for exactly this! It combines the EventStoreQueries API with a subscription and a subscription storage. It starts off by streaming historic events from the event store and then automatically switch to continuous streaming mode once the historic events have caught up.

To get start you need to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-blocking-catchup-subscription</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-util-blocking-catchup-subscription:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-util-blocking-catchup-subscription" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-util-blocking-catchup-subscription', version='0.1.1') 
[org.occurrent/subscription-util-blocking-catchup-subscription "0.1.1"]
'org.occurrent:subscription-util-blocking-catchup-subscription:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-util-blocking-catchup-subscription" rev="0.1.1" />

For example:

// Create the subscription position storage. Note that if PositionAwareBlockingSubscription
// is also storing the position, it's recommended to share the same BlockingSubscriptionPositionStorage instance.     
BlockingSubscriptionPositionStorage storage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position 
PositionAwareBlockingSubscription continuousSubscription = ...
// Instantiate an event store that implements the EventStoreQueries API
EventStoreQueries eventStoreQueries = ... 


// Now combine the continuous subscription and the subscription position storage to allow
CatchupSupportingBlockingSubscription catchupSubscription = new CatchupSupportingBlockingSubscription(continuousSubscription, eventStoreQueries, storage);

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded")), StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()), cloudEvent -> System.out.println(cloudEvent));
// Create the subscription position storage. Note that if PositionAwareBlockingSubscription
// is also storing the position, it's recommended to share the same BlockingSubscriptionPositionStorage instance.     
BlockingSubscriptionPositionStorage storage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position 
PositionAwareBlockingSubscription continuousSubscription = ...
// Instantiate an event store that implements the EventStoreQueries API
EventStoreQueries eventStoreQueries = ... 

// Now combine the continuous subscription and the subscription position storage to allow
val catchupSubscription = CatchupSupportingBlockingSubscription(continuousSubscription, eventStoreQueries, storage, config)

// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscription.subscribe("mySubscription", filter(type("GameEnded")), StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())) { cloudEvent ->
    println(cloudEvent)
}

CatchupSupportingBlockingSubscription maintains an in-memory cache of event ids. The size of this cache is configurable using a CatchupSupportingBlockingSubscriptionConfig but it defaults to 100. The purpose of the cache is to reduce the likely hood of duplicate events when switching from replay mode to continuous mode. Otherwise, there would be a chance that event written exactly when the switch from replay mode to continuous mode takes places, can be lost. To prevent this, the continuous mode subscription starts at a position before the last event read from the history. The purpose of the cache is thus to filter away events that are detected as duplicates during the switch. If the cache is too small, duplicate events will be sent to the continuous subscription. Typically, you want your application to be idempotent anyways and if so this shouldn’t be a problem.

By default, CatchupSupportingBlockingSubscription also stores the subscription position in the supplied storage so that, if the application crashes during replay mode, it doesn’t need to start replaying from the beginning again. Note that if you don’t want subscription persistence during replay, you can disable this by supplying an CatchupSupportingBlockingSubscriptionConfig instance to the CatchupSupportingBlockingSubscription constructor with a persistCloudEventPositionPredicate predicate that never stores the position:

Predicate<CloudEvent> neverStoreTheSubscriptionPosition = __ -> false;
CatchupSupportingBlockingSubscriptionConfig config = new CatchupSupportingBlockingSubscriptionConfig(100, neverStoreTheSubscriptionPosition);

Reactive Subscriptions

A “reactive subscription” is a subscription that uses non-blocking IO when reading events from the event store, i.e. reading changes from an EventStore will not block a thread. It uses concepts from reactive programming which is well-suited for working with streams of data. This is arguably a bit more complex for the typical Java developer, and you should consider using blocking subscriptions if high throughput, low CPU and memory-consumption is not critical.

All reactive subscriptions implements the org.occurrent.subscription.api.reactor.ReactorSubscription interface which uses components from project reactor. This interface provide means to subscribe to new events from an EventStore as they are written. For example:

subscription.subscribe("mySubscriptionId").doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId").doOnNext(::println).subscribe()
The "subscribe" method returns an instance of "Flux<CloudEvent>".

This will simply print each cloud event written to the event store to the console.

Note that the signature of subscribe is defined like this:

public interface ReactorSubscription<T extends CloudEvent> {

    /**
     * Stream events from the event store as they arrive and provide a function which allows to configure the
     * {@link T} that is used. Use this method if want to start streaming from a specific position.
     *
     * @return A Flux with cloud events which may also includes the SubscriptionPosition that can be used to resume the stream from the current position.
     */
    Flux<T> subscribe(SubscriptionFilter filter, StartAt startAt);

    // Default methods 

}

The type <T>, define the type of the CloudEvent that the subscription produce. It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent type that includes the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a ReactorSubscription implementation that also implement the org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription interface. The PositionAwareReactorSubscription is an example of a ReactorSubscription that returns a wrapper around io.cloudevents.CloudEvent called org.occurrent.subscription.CloudEventWithSubscriptionPosition which adds an additional method, SubscriptionPosition getStreamPosition(), that you can use to get
the current subscription position. Note that CloudEventWithSubscriptionPosition is fully compatible with io.cloudevents.CloudEvent and it’s ok to treat it as such. So given that you’re subscribing from a PositionAwareReactorSubscription, you are responsible for keeping track of the subscription position, so that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”, by calling the globalSubscriptionPosition method which can be useful when starting a new subscription.

For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.

Reactive Subscription Filters

You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:

subscription.subscribe("mySubscriptionId", filter(type("GameEnded"))).doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId", filter(type("GameEnded")).doOnNext(::println).subscribe()

This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console. The filter method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter and type is statically imported from org.occurrent.condition.Condition. The OccurrentSubscriptionFilter is generic and should be applicable to a wide variety of different datastores. However, subscription implementations may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:

subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(System.out::println).subscribe();
subscription.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(::println).subscribe()

Now filter is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification and Filters is imported from com.mongodb.client.model.Filters (i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter.

Reactive Subscription Start Position

A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a org.occurrent.subscription.StartAt instance. It provides two ways to specify the start position, either by using StartAt.now() (default if StartAt is not defined when calling the subscribe function), or StartAt.subscriptionPosition(<subscriptionPosition>), where <subscriptionPosition> is a datastore-specific implementation of the org.occurrent.subscription.SubscriptionPosition interface which provides the start position as a String. You may want to store the String returned by a SubscriptionPosition in a database so that it’s possible to resume a subscription from the last processed position on application restart. You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.

Reactive Subscription Position Storage

It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position provided by a reactive subscription any way you like, Occurrent provides an interface called org.occurrent.subscription.api.reactor.ReactorSubscriptionPositionStorage acts as a uniform abstraction for this purpose. A ReactorSubscriptionPositionStorage is defined like this:

public interface ReactorSubscriptionPositionStorage {
    Mono<SubscriptionPosition> read(String subscriptionId);
    Mono<SubscriptionPosition> save(String subscriptionId, SubscriptionPosition subscriptionPosition);
    Mono<Void> delete(String subscriptionId);
}

I.e. it’s a way to read/write/delete the SubscriptionPosition for a given subscription. Occurrent ships one pre-defined reactive implementation (please contribute!):

1. SpringReactorSubscriptionPositionStorageForMongoDB
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition’s in MongoDB.

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor-position-storage</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor-position-storage:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor-position-storage" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor-position-storage', version='0.1.1') 
[org.occurrent/subscription-mongodb-spring-reactor-position-storage "0.1.1"]
'org.occurrent:subscription-mongodb-spring-reactor-position-storage:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor-position-storage" rev="0.1.1" />

If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “reactive subscription API” which contains the ReactorSubscriptionPositionStorage interface:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-api-reactor</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-api-reactor:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-api-reactor" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-api-reactor', version='0.1.1') 
[org.occurrent/subscription-api-reactor "0.1.1"]
'org.occurrent:subscription-api-reactor:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-api-reactor" rev="0.1.1" />

Reactive Subscription Implementations

These are the non-durable reactive subscription implementations:

MongoDB

It's important to recognize that MongoDB subscriptions are using the oplog so you need to make sure that you have enough oplog capacity to support your use case. You can also read more about this here. Typically this shouldn't be a problem, but if you have subscribers that risk falling behind, you may consider piping the events to e.g. Kafka and leverage it for these types of subscriptions. Note that you can always use a catch-up subscription to recover if this happens.

By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription interface (since these types of subscriptions doesn’t need to be aware of the position). However you can do this anyway you like.

Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like, but for most cases you probably want to look into implementations of org.occurrent.subscription.api.reactor.ReactorSubscriptionPositionStorage. These subscriptions can be combined with a subscription position storage implementation to store the position in a durable datastore.

Occurrent provides a utility that combines a PositionAwareReactorSubscription and a ReactorSubscriptionPositionStorage (see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a subscription position storage implementation, and store the position yourself when you find fit.

Reactive Subscription using Spring ReactiveMongoTemplate

An implementation that uses Spring’s ReactiveMongoTemplate for event subscriptions.

First include the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-mongodb-spring-reactor</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor', version='0.1.1') 
[org.occurrent/subscription-mongodb-spring-reactor "0.1.1"]
'org.occurrent:subscription-mongodb-spring-reactor:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor" rev="0.1.1" />

Then create a new instance of SpringReactorSubscriptionForMongoDB and start subscribing:

ReactiveMongoTemplate reactiveMongoTemplate = ...
// Create the blocking subscription
BlockingSubscription<CloudEvent> subscriptions = new SpringReactorSubscriptionForMongoDB(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to 
// the  (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct) 
subscriptions.subscribe("mySubscriptionId").flatMap(cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId");
val reactiveMongoTemplate : ReactiveMongoTemplate = ... 
// Create the blocking subscription
val subscriptions = SpringReactorSubscriptionForMongoDB(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)

// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptions.subscribe("mySubscriptionId").flatMap { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }.subscribe()

// You can later cancel the subscription by calling:
subscriptions.cancelSubscription("mySubscriptionId")
SpringReactorSubscriptionForMongoDB can be imported from the "org.occurrent.subscription.mongodb.spring.reactor" package.

The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection used by the EventStore implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING that is passed as the third constructor argument, which you can read more about here. It’s also very important that this is configured the same way as the EventStore.

It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the ReactiveMongoTemplate instance.

Note that you can provide a filter, start position and position persistence for this subscription implementation.

Automatic Subscription Position Persistence (Reactive)

Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application. Occurrent provides a utility that combines a PositionAwareReactorSubscription and a ReactorSubscriptionPositionStorage implementation (see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate to ReactorSubscriptionWithAutomaticPositionPersistenceConfig. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new ReactorSubscriptionWithAutomaticPositionPersistenceConfig(3) that creates an instance of EveryN that stores the position for every third event.

To use it, first to add the following dependency:

<dependency>
    <groupId>org.occurrent</groupId>
    <artifactId>subscription-util-reactor-automatic-position-persistence</artifactId>
    <version>0.1.1</version>
</dependency>
compile 'org.occurrent:subscription-util-reactor-automatic-position-persistence:0.1.1'
libraryDependencies += "org.occurrent" % "subscription-util-reactor-automatic-position-persistence" % "0.1.1"
@Grab(group='org.occurrent', module='subscription-util-reactor-automatic-position-persistence', version='0.1.1') 
[org.occurrent/subscription-util-reactor-automatic-position-persistence "0.1.1"]
'org.occurrent:subscription-util-reactor-automatic-position-persistence:jar:0.1.1'
<dependency org="org.occurrent" name="subscription-util-reactor-automatic-position-persistence" rev="0.1.1" />

Then we should instantiate a PositionAwareReactorSubscription, that subscribes to the events from the event store, and an instance of a ReactorSubscriptionPositionStorage, that stores the subscription position, and combine them to a ReactorSubscriptionWithAutomaticPositionPersistence:

// Create the non-durable blocking subscription instance 
PositionAwareReactorSubscription nonDurableSubscription = ...
// Create the storage
ReactorSubscriptionPositionStorage storage = ...

// Now combine the non-durable subscription and the subscription position storage
ReactorSubscriptionWithAutomaticPositionPersistence durableSubscription = new ReactorSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);

// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe(); 
// Create the non-durable blocking subscription instance 
val nonDurableSubscription : PositionAwareReactorSubscription = ...
// Create the storage
val storage : ReactorSubscriptionPositionStorage = ...

// Now combine the non-durable subscription and the subscription position storage
val durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)

// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent -> 
    doSomethingWithTheCloudEvent(cloudEvent) 
}.subscribe()

Contact & Support

Would you like to contribute to Occurrent? That’s great! You should join the mailing-list or contribute on github. The mailing-list can also be used for support and discussions.

Credits

Thanks to Per Ökvist for discussions and ideas, and David Åse for letting me fork the awesome javalin website.