Documentation
The documentation on this page is always for the latest version of Occurrent, currently 0.19.6
.
Introduction
Occurrent is an event sourcing library, or if you wish, a set of event sourcing utilities for the JVM, created by Johan Haleby. There are many options for doing event sourcing in Java already so why build another one? There are a few reasons for this besides the intrinsic joy of doing something yourself:
- You should be able to design your domain model without any dependencies to Occurrent or any other library. Your domain model can be expressed with pure functions that returns events. Use Occurrent to store these events. This is a very important design decision! Many people talk about doing this, but I find it rare in practise, and some existing event sourcing frameworks makes this difficult or non-idiomatic.
- Simple: Pick only the libraries you need, no need for an all or nothing solution. If you don’t need subscriptions, then don’t use them! Use the infrastructure that you already have and hook these into Occurrent.
- Occurrent is not a database by itself. The goal is to be a thin wrapper around existing commodity databases that you may already be familiar with.
- Events are stored in a standard format (cloud events). You are responsible for serializing/deserializing the cloud events “body” (data) yourself.
While this may seem like a limitation at first, why not just serialize your POJO directly to arbitrary JSON like you’re used to?, it really enables a lot of use cases and peace of mind. For example:
- It should be possible to hook in various standard components into Occurrent that understands cloud events. For example a component could visualize a distributed tracing graph from the cloud events if using the distributed tracing cloud event extension.
- Since the current idea is to be as close as possible to the specification even in the database,
you can use the database to your advantage. For example, you can create custom indexes used for fast and fully consistent domain queries directly on an event stream (or even multiple streams).
- Composable: Function composition and pipes are encouraged. For example pipe the event stream to a rehydration function (any function that converts a stream of events to current state) before calling your domain model.
- Pragmatic: Need consistent projections? You can decide to write projections and events transactionally using tools you already know (such as Spring
@Transactional
)! - Interoperable/Portable: Cloud events is a CNCF specification for describing event data in a common way. CloudEvents seeks to dramatically simplify event declaration and delivery across services, platforms, and beyond!
- Use the Occurrent components as lego bricks to compose your own pipelines. Components are designed to be small so that you should be able to re-write them tailored to your own needs if required. Missing a component? You should be able to write one yourself and hook into the rest of the eco-system. Write your own problem/domain specific layer on-top of Occurrent.
Concepts
Event Sourcing
Every system needs to store and update data somehow. Many times this is done by storing the current state of an entity in the database.
For example, you might have an entity called Order
stored in a order
table in a relational database. Everytime something happens
to the order, the table is updated with the new information and replacing the previous values. Event Sourcing is a technique that instead stores
the changes, represented by events, that occurred for the entity. Events are facts, things that have happened, and they should never be updated.
This means that not only can you derive the current state from the set of historic events, but you also know which steps that were involved to reach
the current state.
CloudEvents
Cloud events is a CNCF specification for describing event data in a common way. CloudEvents seeks to dramatically simplify event declaration and delivery across services, platforms, and beyond. In Occurrent, you don’t persist your domain events directly to an event store, instead you convert them to a cloud event. You may regard a CloudEvent as a standardized envelope around the data in your domain event.
In practice, this means that instead of storing events in a proprietary or arbitrary format, Occurrent, stores events in accordance with the cloud event specification, even at the data-store level.
I.e. you know the structure of your events, even in the database that the event store uses. It’s up to you as a user of the library to convert your domain events into cloud events when
writing to the event store. This is extremely powerful, not only does it allow you to design your domains event in any way you find fit (for example without compromises enforced by a JSON serialization library) but it also allows for easier migration,
data consistency and features such as (fully-consistent) queries to the event store for certain use cases. A cloud event is made-up by a set of pre-defined attributes described in the cloud event specification.
In the context of event sourcing, we can leverage these attributes in the way suggested below:
Cloud Event Attribute Name |
Event Sourcing Nomenclature | Description |
---|---|---|
id | event id | The cloud event id attribute is used to store the id of a unique event in a particular context (“source”). Note that this id doesn’t necessarily need to be globally unique (but the combination of id and source must). Typically this would be a UUID. |
source | category | You can regard the “source” attribute as the “stream type” or a “category” for certain streams. For example, if you’re creating a game, you may have two kinds of aggregates in your bounded context, a “game” and a “player”. You can regard these as two different sources (categories). These are represented as URN’s, for example the “game” may have the source “urn:mycompany:mygame:game” and “player” may have “urn:mycompany:mygame:player”. This allows, for example, subscriptions to subscribe to all events related to any player (by using a subscription filter for the source attribute). |
subject | “subject” (~identifier) | A subject describes the event in the context of the source, typically an entity (aggregate) id that all events in the stream are related to. This property is optional (because Occurrent automatically adds the streamid attribute) and it’s possible that you may not need to add it. But it can be quite useful. For example, a stream may not necessarily, just hold contents of a single aggregate, and if so the subject can be used to distinguish between different aggregates/entities in a stream. Another example would be if you have multiple streams that represents different aspects of the same entity. For example, if you have a game where players are awarded points based on their performance in the game after the game has ended, you may decide to represent “point awarding” and “game play” as different streams, but they refer to the same “game id”. You can then use the “game id” as subject. |
type | event type | The type of the event. It may be enough to just use name of the domain event, such as “GameStarted” but you may also consider using a URN (e.g. “urn:mycompany:game:started”) or qualify it (“com.mycompany.game.started”). Note that you should try to avoid using the fully-qualified class name of the domain event since you’ll run into trouble if you’re moving the domain event to a different package. |
time | event time | The time when the event occurred (typically would be the application time and not the processing time) described by RFC 3339 (represented as java.time.OffsetDateTime by the CloudEvent SDK). |
datacontenttype | content-type | The content-type of the data attribute, typically you want to use “application/json”, which is also the default if you don’t specify any content-type at all. |
dataschema | schema | The URI to a schema describing the data in the cloud event (optional). |
data | event data | The actual data needed to represent your domain event, for example the contents of a GameStarted event. You can leave out this attribute entirely if your event is fully described by other attributes. |
Note that the table above is to be regarded as a rule of thumb, it’s ok to map things differently if it’s better suited for your application, but it’s a good idea to keep things consistent throughout your organization. To see an example of how this may look in code, refer to the application service documentation.
Occurrent CloudEvent Extensions
Occurrent automatically adds two extension attributes to each cloud event written to the event store:
Attribute Name | Type | Description |
---|---|---|
streamid |
String | An id that uniquely identifies a particular event stream. It’s used to determine which events belong to which stream. |
streamversion |
Long | The id of the stream version for a particular event. It’s used for optimistic concurrency control. |
These are required for Occurrent to operate. A long-term goal of Occurrent is to come up with a standardized set of cloud event extensions that are agreed upon and used by several different vendors.
In the meantime, it’s quite possible that Occurrent will provide a wider set of optional extensions in the future (such as correlation id and/or sequence number). But for now, it’s up to you as a user to add these if you need them (see CloudEvent Metadata), you would typically do this by creating or extending/wrapping an already existing application service.
CloudEvent Metadata
You can specify metadata to the cloud event by making use of extension attributes. This is the place to add things such as sequence number, correlation id, causation id etc. Actually there’s already a standard way of applying distributed tracing and sequence number generation extensions to cloud events that might be of interest.
EventStore
The event store is a place where you store events. Events are immutable pieces of data describing state changes for a particular stream. A stream is a collection of events that are related, typically but not limited to, a particular entity. For example a stream may include all events for a particular instance of a game or an order.
Occurrent provides an interface, EventStore
, that allows to read and write events from the database. The EventStore
interface is actually
composed of various smaller interfaces since not all databases supports all aspects provided by the EventStore
interface. Here’s an example
that writes a cloud event to the event store and read it back:
- Java
- Kotlin
CloudEvent event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
// Write
eventStore.write("streamId", Stream.of(event));
// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".toByteArray())
.build();
// Write
eventStore.write("streamId", Stream.of(event))
// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")
Note that when reading the events, the EventStore
won’t simply return a Stream
of CloudEvent
’s, instead it returns a wrapper called EventStream
.
EventStream
The EventStream
contains the CloudEvent
’s for a stream and the version of the stream. The version can be used to guarantee that only one
thread/process is allowed to write to the stream at the same time, i.e. optimistic locking. This can be achieved by including the version in a write condition.
Note that reading a stream that doesn’t exist (e.g. eventStore.read("non-existing-id")
will return an instance of EventStream
with an empty stream of events and 0
as version number.
The reason for this is that you can use the same “application service” (a fancy word for a piece of code that loads events from the event store, applies them to the domain model and writes the new events returned to the event store)
for both entity creation and subsequent use cases. For example consider this simple domain model:
- Java
- Kotlin
public class WordGuessingGame {
public static Stream<CloudEvent> startNewGame(String gameId, String wordToGuess) {
...
}
public static Stream<CloudEvent> guessWord(Stream<CloudEvent> eventStream, String word) {
...
}
}
// Note that the functions could might as well be placed directly in a package
object WordGuessingGame {
fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...
fun guessWord(eventStream : Stream<CloudEvent>, word : String) : Stream<CloudEvent> = ...
}
Then we could write a generic application service that takes a higher-order function (Stream<CloudEvent>) -> Stream<CloudEvent>
:
- Java
- Kotlin
public class ApplicationService {
private final EventStore eventStore;
public ApplicationService(EventStore eventStore) {
this.eventStore = eventStore;
}
public void execute(String streamId, Function<Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
// Read all events from the event store for a particular stream
EventStream<CloudEvent> eventStream = eventStore.read(streamId);
// Invoke the domain model
Stream<CloudEvent> newEvents = functionThatCallsDomainModel.apply(eventStream.events());
// Persist the new events
eventStore.write(streamId, eventStream.version(), newEvents);
}
}
class ApplicationService constructor (val eventStore : EventStore) {
fun execute(streamId : String, functionThatCallsDomainModel : (Stream<CloudEvent>) -> Stream<CloudEvent>) {
// Read all events from the event store for a particular stream
val eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
// Invoke the domain model
val newEvents = functionThatCallsDomainModel(eventStream.events())
// Persist the new events
eventStore.write(streamId, eventStream.version(), newEvents)
}
}
You could then call the application service like this regardless of you’re starting a new game or not:
- Java
- Kotlin
// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint.
String gameId = ...
String wordToGuess = ...;
// Then we invoke the application service to start a game:
applicationService.execute(gameId, __ -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// Later a player guess a word:
String gameId = ...
String guess = ...;
// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));
// Here we image that we have received the data required to start a new game, e.g. from a REST endpoint.
val gameId : String = ...
val wordToGuess : String = ...;
// Then we invoke the application service to start a game:
applicationService.execute(gameId) {
WordGuessingGame.startNewGame(gameId, wordToGuess)
}
// Later a player guess a word:
val gameId : String = ...
val guess : String = ...;
// We thus invoke the application service again to guess the word:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, gameId, guess));
Writing application services like this is both powerful and simple (once you get used to it). There’s less need for explicit commands and command handlers (the application service is a kind of command handler). You can also use other functional techniques such as partial application to make the code look, arguably, even nicer. It’s also easy to compose several calls to the domain model into one by using standard functional composition techniques. For example in this case you might consider both starting a game and let the player make her first guess from a single request to the REST API. No need to change the domain model to do this, just use function composition.
Write Condition
A “write condition” can be used to specify conditional writes to the event store. Typically, the purpose of this would be to achieve optimistic concurrency control (optimistic locking) of an event stream.
For example, image you have an Account
to which you can deposit and withdraw money. A business rule says that it’s not allowed to have a negative balance on an account.
Now imagine an account that is shared between two persons and contains 20 EUR. Person “A” wants to withdraw 15 EUR and person “B” wants to withdraw 10 EUR.
If they try to do this, an error message should be presented to one of them since the account balance would be negative. But what happens if both persons try to withdraw
the money at the same time? Let’s have a look:
- Java
- Kotlin
// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
// "withdraw" is a pure function in the Account domain model which takes a Stream
// of all current events and the amount to withdraw, and returns new events.
// In this case, a "MoneyWasWithdrawn" event is returned, since 15 EUR is OK to withdraw.
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));
// We write the new events to the event store
eventStore.write("account1", events);
// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // B
// Again we want to withdraw money, and the system will think this is OK,
// since event streams for A and B has not yet recorded that the balance is negative.
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));
// We write the new events to the event store without any problems! 😱
// But this shouldn't work since it would violate the business rule!
eventStore.write("account1", events);
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A
// "withdraw" is a pure function in the Account domain model which takes a Stream
// of all current events and the amount to withdraw. It returns a stream of
// new events, in this case only a "MoneyWasWithdrawn" event, since 15 EUR is OK to withdraw.
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR))
// We write the new events to the event store
eventStore.write("account1", events)
// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1") // B
// Again we want to withdraw money, and the system will think this is OK,
// since the Account thinks that 10 EUR will have a balance of 10 EUR after
// the withdrawal.
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))
// We write the new events to the event store without any problems! 😱
// But this shouldn't work since it would violate the business rule!
eventStore.write("account1", events)
To avoid the problem above we want to make use of conditional writes. Let’s see how:
- Java
- Kotlin
// Person A at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
long currentVersion = eventStream.version();
// Withdraw money
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(15, EUR));
// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.
eventStore.write("account1", currentVersion, events);
// Now in a different thread let's imagine Person B at _time 1_
EventStream<CloudEvent> eventStream = eventStore.read("account1"); // A
long currentVersion = eventStream.version();
// Again we want to withdraw money, and the system will think this is OK,
// since event streams for A and B has not yet recorded that the balance is negative.
Stream<CloudEvent> events = Account.withdraw(eventStream.events(), Money.of(10, EUR));
// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception.
eventStore.write("account1", currentVersion, events);
// Person A at _time 1_
val eventStream = eventStore.read("account1") // A
val currentVersion = eventStream.version()
// Withdraw money
val events = Account.withdraw(eventStream.events(), Money.of(15, EUR));
// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be A.
eventStore.write("account1", currentVersion, events)
// Now in a different thread let's imagine Person B at _time 1_
val eventStream = eventStore.read("account1"); // A
val currentVersion = eventStream.version()
// Again we want to withdraw money, and the system will think this is OK,
// since event streams for A and B has not yet recorded that the balance is negative.
val events = Account.withdraw(eventStream.events(), Money.of(10, EUR))
// We write the new events to the event store with a write condition that implies
// that the version of the event stream must be B. But now Occurrent will throw
// a "org.occurrent.eventstore.api.WriteConditionNotFulfilledException" since, in this
// case A was slightly faster, and the version of the event stream no longer match!
// The entire operation should be retried for person B and when "Account.withdraw(..)"
// is called again it could throw a "CannotWithdrawSinceBalanceWouldBeNegative" exception.
eventStore.write("account1", currentVersion, events)
What you’ve seen above is a simple, but widely used, form of write condition. Actually, doing eventStore.write("streamId", version, events)
is just a shortcut for:
- Java
- Kotlin
eventStore.write("streamId", WriteCondition.streamVersionEq(version), events);
eventStore.write("streamId", WriteCondition.streamVersionEq(version), events)
But you can compose a more advanced write condition using a Condition
:
- Java
- Kotlin
eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events);
eventStore.write("streamId", WriteCondition.streamVersion(and(lt(10), ne(5)), events)
where lt
, ne
and and
is statically imported from org.occurrent.condition.Condition
.
EventStore Queries
Since Occurrent builds on-top of existing databases it’s ok, given that you know what you’re doing*, to use the strengths of these databases.
One such strength is that databases typically have good querying support. Occurrent exposes this with the EventStoreQueries
interface
that an EventStore implementation may implement to expose querying capabilities. For example:
- Java
- Kotlin
OffsetDateTime lastTwoHours = OffsetDateTime.now().minusHours(2);
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
Stream<CloudEvent> events = eventStore.query(subject("123").and(time(gte(lastTwoHours))), SortBy.time(DESCENDING));
val lastTwoHours = OffsetDateTime.now().minusHours(2);
// Query the database for all events the last two hours that have "subject" equal to "123" and sort these in descending order
val events : Stream<CloudEvent> = eventStore.query(subject("123").and(time(gte(lastTwoHours))), SortBy.time(DESCENDING))
The subject
and time
methods are statically imported from org.occurrent.filter.Filter
and lte
is statically imported from org.occurrent.condition.Condition
.
EventStoreQueries
is not bound to a particular stream, rather you can query any stream (or multiple streams at the same time).
It also provides the ability to get an “all” stream:
- Java
- Kotlin
// Return all events in an event store sorted by descending order
Stream<CloudEvent> events = eventStore.all(SortBy.time(DESCENDING));
// Return all events in an event store sorted by descending order
val events : Stream<CloudEvent> = eventStore.all(SortBy.time(DESCENDING))
The EventStoreQueries
interface also supports skip and limit capabilities which allows for pagination:
- Java
- Kotlin
// Skip 42, limit 1024
Stream<CloudEvent> events = eventStore.all(42, 1024);
// Skip 42, limit 1024
val events : Stream<CloudEvent> = eventStore.all(42, 1024)
To get started with an event store refer to Choosing An EventStore.
Subscriptions
A subscription is a way to get notified when new events are written to an event store. Typically, a subscription will be used to create views from events (such as projections, sagas, snapshots etc) or
create integration events that can be forwarded to another piece of infrastructure such as a message bus. There are two different kinds of API’s, the first one is a blocking API
represented by the org.occurrent.subscription.api.blocking.SubscriptionModel
interface (in the org.occurrent:subscription-api-blocking
module), and second one is a reactive API
represented by the org.occurrent.subscription.api.reactor.SubscriptionModel
interface (in the org.occurrent:subscription-api-reactor
module).
The blocking API is callback based, which is fine if you’re working with individual events (you can of course use a simple function that aggregates events into batches yourself).
If you want to work with streams of data, the reactor SubscriptionModel
is probably a better option since it’s using the Flux
publisher from project reactor.
Note that it’s fine to use reactive SubscriptionModel
, even though the event store is implemented using the blocking api, and vice versa.
If the datastore allows it, you can also run subscriptions in a different process than the processes reading and writing to the event store.
To get started with subscriptions refer to Using Subscriptions.
EventStore Operations
Occurrent event store implementations may optionally also implement the EventStoreOperations
interface. It provides means to delete a specific event, or an entire
event stream. For example:
- Java
- Kotlin
// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId");
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource);
// This will delete all events in stream "myStream" that has a version less than or equal to 19.
eventStoreOperations.delete(streamId("myStream").and(streamVersion(lte(19L)));
// Delete an entire event stream
eventStoreOperations.deleteEventStream("streamId")
// Delete a specific event
eventStoreOperations.deleteEvent("cloudEventId", cloudEventSource)
// This will delete all events in stream "myStream" that has a version less than or equal to 19.
eventStoreOperations.delete(streamId("myStream").and(streamVersion(lte(19L)))
These are probably operations that you want to use sparingly. Typically, you never want to remove any events, but there are some cases, such as GDPR or other regulations, that requires the deletion of an event or an entire event stream. You should be aware that there are other ways to solve this though. One way would be to encrypt personal data and throw away the key when the user no longer uses the service. Another would be to store personal data outside the event store.
Another reason for deleting events is if you’re implementing something like “closing the books” or certain types of snapshots, and don’t need the old events anymore.
Another feature provided by EventStoreOperations
is the ability to update an event. Again, this is not something you normally want to do, but it can be useful for
certain strategies of GDPR compliance. For example maybe you want to remove or update personal data in an event when a users unregisters from your service. Here’s an example:
- Java
- Kotlin
eventStoreOperations.updateEvent("cloudEventId", cloudEventSource, cloudEvent -> {
return CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build();
});
eventStoreOperations.updateEvent("cloudEventId", cloudEventSource) { cloudEvent ->
CloudEventBuilder.v1(cloudEvent).withData(removePersonalDetailsFrom(cloudEvent)).build()
})
Views
Occurrent doesn’t have any special components for creating views/projections. Instead, you simply create a subscription in which you can create and store the view as you find fit. But this doesn’t have to be difficult! Here’s a trivial example of a view that maintains the number of ended games. It does so by inceasing the “numberOfEndedGames” field in an (imaginary) database for each “GameEnded” event that is written to the event store:
- Java
- Kotlin
// An imaginary database API
Database someDatabase = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view"
// and increase "numberOfEndedGames" for each ended game.
subscriptionModel.subscribe("my-view", filter(type("GameEnded")), cloudEvent -> someDatabase.inc("numberOfEndedGames"));
// An imaginary database API
val someDatabase : Database = ...
// Subscribe to all "GameEnded" events by starting a subscription named "my-view"
// and increase "numberOfEndedGames" for each ended game.
subscriptionModel.subscribe("my-view", filter(type("GameEnded"))) {
someDatabase.inc("numberOfEndedGames")
}
Where filter
is imported from org.occurrent.subscription.OccurrentSubscriptionFilter
and type
is imported from org.occurrent.condition.Condition
.
While this is a trivial example it shouldn’t be difficult to create a view that is backed by a JPA entity in a relational database based on a subscription.
Commands
A command is used to represent an intent in an event sourced system, i.e. something that you want to do. They’re different, in a very important way, from events in that commands
can fail or be rejected, where-as events cannot. A typical example of a command would be a data structure whose name is defined as an imperative verb, for example PlaceOrder
.
The resulting event, if the command is processed successfully, could then be OrderPlaced
. However, in Occurrent, as explained in more detail in the Command Philosophy
section below, you may start off by not using explicit data structures for commands unless you want to. In Occurrent, you can instead use pure functions
to represent commands and command handling. Combine this with function composition and you have a powerful way to invoke the domain model (refer to the application service for examples).
Command Philosophy
Occurrent doesn’t contain a built-in command bus. Instead, you’re encouraged to pick any infrastructure component you need to act as the command bus to send commands to another service. Personally, I typically make a call to a REST API or make an RPC invocation instead of using a distributed command bus that routes the commands to my aggregate. There are of course exceptions to this, such as the need for location transparency or if you’re using Decider’s. If you need location transparency, a command bus or an actor model can be of help. But I would argue that you may not always need the complexity by prematurely going down this route if your business requirements doesn’t point you in this direction. Decider’s are a nice alternative, that doesn’t require any additional infrastructure.
But what about internally? For example if a service exposes a REST API and upon receiving a request it publishes a command that’s somehow picked up and routed to a function in your domain model. This is where an application service becomes useful. However, let’s first explore the rationale behind the philosophy of Occurrent. In other frameworks, it’s not uncommon that you define your domain model like this:
- Java
- Kotlin
public class WordGuessingGame extends AggregateRoot {
@AggregateId
private String gameId;
private String wordToGuess;
@HandleCommand
public void handle(StartNewGameCommand startNewGameCommand) {
// Insert some validation and logic
...
// Publish an event using the "publish" method from AggregateRoot
publish(new WordGuessingGameWasStartedEvent(...));
}
@HandleCommand
public void handle(GuessWordCommand guessWordCommand) {
// Some validation and implementation ...
...
// Publish an event using the "publish" method from AggregateRoot
publish(new WordGuessedEvent(...));
}
@HandleEvent
public void handle(WordGuessingGameWasStartedEvent e) {
this.gameId = e.getGameId();
this.wordToGuess = e.getWordToGuess();
}
...
}
class WordGuessingGame : AggregateRoot() {
@AggregateId
var gameId : String
var wordToGuess : String
@HandleCommand
fun handle(startNewGameCommand : StartNewGameCommand) {
// Insert some validation and logic
...
// Publish an event using the "publish" method from AggregateRoot
publish(WordGuessingGameWasStartedEvent(...))
}
@HandleCommand
fun handle(guessWordCommand : GuessWordCommand) {
// Some validation and implementation ...
...
// Publish an event using the "publish" method from AggregateRoot
publish(WordGuessedEvent(...))
}
@HandleEvent
fun handle(e : WordGuessingGameWasStartedEvent) {
gameId = e.getGameId()
wordToGuess = e.getWordToGuess()
}
...
}
Let’s look at a “command” and see what it typically looks like in these frameworks:
- Java
- Kotlin
public class StartNewGameCommand {
@AggregateId
private String gameId;
private String wordToGuess;
public void setGameId(String gameId) {
this.gameId = gameId;
}
public String getGameId() {
return gameId;
}
public void setWordToGuess(String wordToGuess) {
this.gameId = gameId;
}
public String getWordToGuess() {
return wordToGuess;
}
// Equals/hashcode/tostring methods are excluded for breivty
}
data class StartNewGameCommand(@AggregateId var gameId: String, val wordToGuess : String)
Now that we have our WordGuessingGame
implementation and a command we can dispatch it to a command bus:
- Java
- Kotlin
commandbus.dispatch(new StartNewGameCommand("someGameId", "Secret word"));
commandbus.dispatch(StartNewGameCommand("someGameId", "Secret word"))
From a typical Java perspective one could argue that this is not too bad. But it does have a few things one could improve upon from a broader perspective:
- The
WordGuessingGame
is complecting several things that may be modelled separately. Data, state, behavior, command- and event routing and event publishing are all defined in the same model (theWordGuessingGame
class). It also uses framework specific annotations, classes and inheritance inside your domain model which is something you want to avoid. For small examples like this it arguably doesn’t matter, but if you have complex logic and a large system, it probably will in my experience. Keeping state and behavior separate allows for easier testing, referential transparency and function composition. It allows treating the state as a value which has many benefits. - Commands are defined as explicit data structures with framework-specific annotations when arguably they don’t have to. This is fine if you need to serialize the command (in order to send it to another location or to schedule it for the future) but one could argue that you don’t want to couple your commands to some infrastructure. This is of course a trade-off, in Occurrent you’re free to choose any approach you like (i.e. commands/functions can be completely free of framework/library/infrastructure concerns).
Commands in Occurrent
So how would one dispatch commands in Occurrent? As we’ve already mentioned there’s nothing stopping you from using a (distributed) command bus or to create explicit commands, and dispatch them the way we did in the example above. For example, if you’re using Decider’s, it could be nice to have commands explicitly defined. But if you recognize some of the points described above and are looking for a simpler approach, here’s another way to go about. First let’s refactor the domain model to pure functions, without any state or dependencies to Occurrent or any other library/framework.
- Java
- Kotlin
public class WordGuessingGame {
public static Stream<DomainEvent> startNewGame(String gameId, String wordToGuess) {
...
}
public static Stream<DomainEvent> guessWord(Stream<DomainEvent> eventStream, String word) {
...
}
}
// Note that the functions could might as well be placed directly in a package.
// You might also want to use a Kotlin Sequence or List instead of a Stream
object WordGuessingGame {
fun startNewGame(gameId : String, wordToGuess : String) : Stream<CloudEvent> = ...
fun guessWord(eventStream : Stream<DomainEvent>, word : String) : Stream<DomainEvent> = ...
}
If you define your behavior like this it’ll be easy to test (and also to compose using normal function composition techniques). There are no side-effects (such as publishing events) which also allows for easier testing and local reasoning.
But where are our commands!? In this example we’ve decided to represent them as functions. I.e. the “command” is modeled as simple function, e.g. startNewGame
!
This means that the command handling logic is handled by function as well. You don’t need to switch/match over the command since you directly invoke the function itself.
Again, you may prefer to actually define your commands explicitly, but in this example we’ll just be using normal functions.
But wait, how are these functions called? Create or copy a generic ApplicationService
class like the one below
(or use the generic application service provided by Occurrent):
- Java
- Kotlin
public class ApplicationService {
private final EventStore eventStore;
private final Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent;
private final Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent;
public ApplicationService(EventStore eventStore,
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent,
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent) {
this.eventStore = eventStore;
this.convertCloudEventToDomainEvent = convertCloudEventToDomainEvent;
this.convertDomainEventToCloudEvent = convertDomainEventToCloudEvent;
}
public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
// Read all events from the event store for a particular stream
EventStream<CloudEvent> eventStream = eventStore.read(streamId);
// Convert the cloud events into domain events
Stream<DomainEvent> domainEventsInStream = eventStream.events().map(convertCloudEventToDomainEvent);
// Call a pure function from the domain model which returns a Stream of domain events
Stream<DomainEvent> newDomainEvents = functionThatCallsDomainModel.apply(domainEventsInStream);
// Convert domain events to cloud events and write them to the event store
eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent));
}
}
class ApplicationService constructor (val eventStore : EventStore,
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent,
val convertDomainEventToCloudEvent : (DomainEvent) -> CloudEvent) {
fun execute(streamId : String, functionThatCallsDomainModel : (Stream<DomainEvent>) -> Stream<DomainEvent>) {
// Read all events from the event store for a particular stream
val eventStream : EventStream<CloudEvent> = eventStore.read(streamId)
// Convert the cloud events into domain events
val domainEventsInStream : Stream<DomainEvent> = eventStream.events().map(convertCloudEventToDomainEvent)
// Call a pure function from the domain model which returns a Stream of domain events
val newDomainEvents = functionThatCallsDomainModel(domainEventsInStream)
// Convert domain events to cloud events and write them to the event store
eventStore.write(streamId, eventStream.version(), newDomainEvents.map(convertDomainEventToCloudEvent))
}
}
and then use the ApplicationService
like this:
- Java
- Kotlin
// A function that converts a CloudEvent to a "domain event"
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEvent = ..
// A function that a "domain event" to a CloudEvent
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEvent = ..
EventStore eventStore = ..
ApplicationService applicationService = new ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);
// Now in your REST API use the application service:
String gameId = ... // From a form parameter
String wordToGuess = .. // From a form parameter
applicationService.execute(gameId, events -> WordGuessingGame.startNewGame(gameId, wordToGuess));
// A function that converts a CloudEvent to a "domain event"
val convertCloudEventToDomainEvent : (CloudEvent) -> DomainEvent = ..
// A function that a "domain event" to a CloudEvent
val convertDomainEventToCloudEvent = (DomainEvent) -> CloudEvent = ..
val eventStore : EventStore = ..
val applicationService = ApplicationService(eventStore, convertCloudEventToDomainEvent, convertDomainEventToCloudEvent);
// Now in your REST API use the application service:
val gameId = ... // From a form parameter
val wordToGuess = .. // From a form parameter
applicationService.execute(gameId) { events ->
WordGuessingGame.startNewGame(gameId, wordToGuess)
}
We’re leveraging higher-order functions instead of using explicit commands.
Command Composition
Many times it’s useful to compose multiple commands into a single unit-of-work for the same stream/aggregate. What this means that you’ll “merge” several commands into one, and they will be executed in an atomic fashion. I.e. either all commands succeed, or all commands fail.
While you’re free to use any means and/or library to achieve this, Occurrent ships with a “command composition” library that you can leverage:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>command-composition</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:command-composition:0.19.6'
libraryDependencies += "org.occurrent" % "command-composition" % "0.19.6"
@Grab(group='org.occurrent', module='command-composition', version='0.19.6')
[org.occurrent/command-composition "0.19.6"]
'org.occurrent:command-composition:jar:0.19.6'
<dependency org="org.occurrent" name="command-composition" rev="0.19.6" />
As an example consider this simple domain model:
- Java
- Kotlin
public class WordGuessingGame {
public Stream<DomainEvent> startNewGame(Stream<DomainEvents> events, String gameId, String wordToGuess) {
// Implementation
}
public Stream<DomainEvent> guessWord(Stream<DomainEvents> events, String guess) {
// Implementation
}
}
object WordGuessingGame {
fun startNewGame(events : Sequence<DomainEvent>), gameId : String, wordToGuess : String) : Sequence<DomainEvent> {
// Implementation
}
fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
// Implementation
}
}
Imagine that for a specific API you want to allow starting a new game and making a guess in the same request. Instead of changing your domain model,
you can use function composition! If you import org.occurrent.application.composition.command.StreamCommandComposition.composeCommands
you can do like this:
- Java
- Kotlin
String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess),
events -> WordGuessingGame.makeGuess(events, guess)
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
{ events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess) }
{ events -> WordGuessingGame.makeGuess(events, guess) }
))
If you’re using Kotlin you can also make use of the andThen
(infix) function for command composition (import org.occurrent.application.composition.command.andThen
):
applicationService.execute(gameId,
{ events -> WordGuessingGame.startNewGame(events, gameId, wordToGuess) }
andThen { events -> WordGuessingGame.makeGuess(events, guess) })
Events returned from WordGuessingGame.startNewGame(..)
will be appended to the event stream when calling WordGuessingGame.makeGuess(..)
and the new domain events
returned by the two functions will be merged and written in an atomic fashion to the event store.
The command composition library also contains some utilities for partial function application
that you can use to further enhance the example above (if you like). If you statically import partial
method from org.occurrent.application.composition.command.partial.PartialFunctionApplication
you can refactor the code above into this:
- Java
- Kotlin
String gameId = ...
String wordToGuess = ...
String guess = ...
applicationService.execute(gameId, composeCommands(
partial(WordGuessingGame::startNewGame, gameId, wordToGuess),
partial(WordGuessingGame::makeGuess, guess)
));
val gameId = ...
val wordToGuess = ...
val guess = ...
applicationService.execute(gameId, composeCommands(
WordGuessingGame::startNewGame.partial(gameId, wordToGuess)
WordGuessingGame::makeGuess.partial(guess)
))
With Kotlin, you can also use andThen
(described above) to do:
applicationService.execute(gameId,
WordGuessingGame::startNewGame.partial(gameId, wordToGuess)
andThen WordGuessingGame::makeGuess.partial(guess))
Command Conversion
If you have an application service that takes a higher-order function in the form of Function<Stream<DomainEvent>, Stream<DomainEvent>>
but your
domain model is defined with list’s (Function<List<DomainEvent, List<DomainEvent>
) then Occurrent provides means to easily convert between them. First you need
to depend on the Command Composition library:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>command-composition</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:command-composition:0.19.6'
libraryDependencies += "org.occurrent" % "command-composition" % "0.19.6"
@Grab(group='org.occurrent', module='command-composition', version='0.19.6')
[org.occurrent/command-composition "0.19.6"]
'org.occurrent:command-composition:jar:0.19.6'
<dependency org="org.occurrent" name="command-composition" rev="0.19.6" />
Let’s say you have a domain model defined like this:
- Java
- Kotlin
public class WordGuessingGame {
public List<DomainEvent> guessWord(List<DomainEvents> events, String guess) {
// Implementation
}
}
object WordGuessingGame {
fun guessWord(events : List<DomainEvent>, guess : String) : List<DomainEvent> {
// Implementation
}
}
But you application service takes a Function<Stream<DomainEvent>, Stream<DomainEvent>>
:
public class ApplicationService {
public void execute(String streamId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel) {
// Implementation
}
}
Then you can make use of the toStreamCommand
in org.occurrent.application.composition.command.CommandConversion
to call the domain function:
- Java
- Kotlin
String guess = ...
applicationService.execute(gameId, toStreamCommand( events -> WordGuessingGame.makeGuess(events, guess)));
val guess = ...
applicationService.execute(gameId, toStreamCommand { events -> WordGuessingGame.makeGuess(events, guess) } )
CloudEvent Conversion
To convert between domain events and cloud events you can use the cloud event converter API that’s shipped with Occurrent. This is optional, but components such as the application service and subscription dsl uses a cloud event converter to function.
If you’re only using an event store and subscriptions then you don’t need a cloud event converter (or you can roll your own).
All cloud event converters implements the org.occurrent.application.converter.CloudEventConverter
interface from the org.occurrent:cloudevent-converter
module (see custom cloudevent converter).
Generic CloudEvent Converter
This is a really simple cloud event converter to which you can pass two higher-order functions that converts to and from domain events respectively. To use it depend on:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>cloudevent-converter-generic</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-generic:0.19.6'
libraryDependencies += "org.occurrent" % "cloudevent-converter-generic" % "0.19.6"
@Grab(group='org.occurrent', module='cloudevent-converter-generic', version='0.19.6')
[org.occurrent/cloudevent-converter-generic "0.19.6"]
'org.occurrent:cloudevent-converter-generic:jar:0.19.6'
<dependency org="org.occurrent" name="cloudevent-converter-generic" rev="0.19.6" />
For example:
Function<CloudEvent, DomainEvent> convertCloudEventToDomainEventFunction = .. // You implement this function
Function<DomainEvent, CloudEvent> convertDomainEventToCloudEventFunction = .. // You implement this function
CloudEventConverter<CloudEvent> cloudEventConverter = new GenericCloudEventConverter<>(convertCloudEventToDomainEventFunction, convertDomainEventToCloudEventFunction);
If your domain model is already using a CloudEvent
(and not a custom domain event) then you can just pass a Function.identity()
to the GenericCloudEventConverter
:
CloudEventConverter<CloudEvent> cloudEventConverter = new GenericCloudEventConverter<>(Function.identity(), Function.identity());
XStream CloudEvent Converter
This cloud event converter uses XStream to convert domain events to cloud events to XML and back. To use it, first depend on this module:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>cloudevent-converter-xstream</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-xstream:0.19.6'
libraryDependencies += "org.occurrent" % "cloudevent-converter-xstream" % "0.19.6"
@Grab(group='org.occurrent', module='cloudevent-converter-xstream', version='0.19.6')
[org.occurrent/cloudevent-converter-xstream "0.19.6"]
'org.occurrent:cloudevent-converter-xstream:jar:0.19.6'
<dependency org="org.occurrent" name="cloudevent-converter-xstream" rev="0.19.6" />
Next you can instantiate it like this:
- Java
- Kotlin
XStream xStream = new XStream();
xStream.allowTypeHierarchy(MyDomainEvent.class);
URI cloudEventSource = URI.create("urn:company:domain")
XStreamCloudEventConverter<MyDomainEvent> cloudEventConverter = new XStreamCloudEventConverter<>(xStream, cloudEventSource);
val xStream = XStream().apply { allowTypeHierarchy(MyDomainEvent::class.java) }
val cloudEventSource = URI.create("urn:company:domain")
val cloudEventConverter = new XStreamCloudEventConverter<>(xStream, cloudEventSource)
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder, new XStreamCloudEventConverter.Builder<MyDomainEvent>().. build()
.
Jackson CloudEvent Converter
This cloud event converter uses Jackson to convert domain events to cloud events to JSON and back. To use it, first depend on this module:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>cloudevent-converter-jackson</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-jackson:0.19.6'
libraryDependencies += "org.occurrent" % "cloudevent-converter-jackson" % "0.19.6"
@Grab(group='org.occurrent', module='cloudevent-converter-jackson', version='0.19.6')
[org.occurrent/cloudevent-converter-jackson "0.19.6"]
'org.occurrent:cloudevent-converter-jackson:jar:0.19.6'
<dependency org="org.occurrent" name="cloudevent-converter-jackson" rev="0.19.6" />
Next you can instantiate it like this:
- Java
- Kotlin
ObjectMapper objectMapper = new ObjectMapper();
URI cloudEventSource = URI.create("urn:company:domain")
JacksonCloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource);
val objectMapper = jacksonObjectMapper()
val cloudEventSource = URI.create("urn:company:domain")
val cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource)
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder, new JacksonCloudEventConverter.Builder<MyDomainEvent>().. build()
.
In production, you almost certainly want to change the way the JacksonCloudEventConverter
generates the cloud event type from the domain event. By default, the cloud event type will be generated
from the fully-qualified class name of the domain event class type. I.e. if you do:
CloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, cloudEventSource);
CloudEvent cloudEvent = cloudEventConverter.toCloudEvent(new SomeDomainEvent());
Then cloudEvent.getType()
will return com.mycompany.SomeDomainEvent
. Typically, you want to decouple the cloud event type from the fully-qualified name of the class. A better, but arguably still not optimal way, would be to make
cloudEvent.getType()
return SomeDomainEvent
instead. The JacksonCloudEventConverter
allows us to do this by using the builder:
CloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter.Builder<MyDomainEvent>()
.typeMapper(..) // Specify a custom way to map the domain event to a cloud event and vice versa
.build();
But when using Jackson, we can’t just configure the type mapper to return the “simple name” of the domain event class instead of the fully-qualified name. This is because there’s no generic way to derive the fully-qualified name from just the simple name. The fully-qualified name is needed in order for Jackson to map the cloud event back into a domain event. In order to work-around this you could implement your own type mapper (that you pass to the builder above) or create an instance of ReflectionCloudEventTypeMapper that knows how to convert the “simple name” cloud event type back into the domain event class. There are a couple of ways, the most simple one is probably this:
CloudEventTypeMapper<MyDomainEvent> typeMapper = ReflectionCloudEventTypeMapper.simple(MyDomainEvent.class);
This will create an instance of ReflectionCloudEventTypeMapper
that uses the simple name of the domain event as cloud event type. But the crucial thing is that when deriving the domain event type from the cloud event,
the ReflectionCloudEventTypeMapper
will prepend the package name of supplied domain event type (MyDomainEvent
) to the cloud event type, thus reconstructing the fully-qualified name of the class.
For this to work, all domain events must reside in exactly the same package as MyDomainEvent
.
Another approach would be to supply a higher-order function that knows how to map the cloud event type back into a domain event class.
CloudEventTypeMapper<MyDomainEvent> typeMapper = ReflectionCloudEventTypeMapper.simple(cloudEventType -> ...);
Again, this will create an instance of ReflectionCloudEventTypeMapper
that uses the simple name of the domain event as cloud event type, but
you are responsible to, somehow, map the cloud event type (cloudEventType
) back into a domain event class.
If you don’t want to use reflection or don’t want to couple the class name to the event name (which is recommended) you can roll your own custom CloudEventTypeMapper
by implementing the
org.occurrent.application.converter.typemapper.CloudEventTypeMapper
interface.
Custom CloudEvent Converter
To create a custom cloud event converter first depend on:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>cloudevent-converter-api</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:cloudevent-converter-api:0.19.6'
libraryDependencies += "org.occurrent" % "cloudevent-converter-api" % "0.19.6"
@Grab(group='org.occurrent', module='cloudevent-converter-api', version='0.19.6')
[org.occurrent/cloudevent-converter-api "0.19.6"]
'org.occurrent:cloudevent-converter-api:jar:0.19.6'
<dependency org="org.occurrent" name="cloudevent-converter-api" rev="0.19.6" />
Let’s have a look at a naive example of how we can create a custom converter that converts domain events to cloud events (and vice versa). This cloud event converter can then be used with the generic application service (the application service implementation provided by Occurrent) and other Occurrent components that requires a CloudEventConverter
.
Note that instead of using the code below you might as well use the Jackson CloudEvent Converter, this is just an example showing how you could roll your own.
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.occurrent.application.converter.CloudEventConverter;
import java.io.IOException;
import java.net.URI;
import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;
public class MyCloudEventConverter implements CloudEventConverter<DomainEvent> {
private final ObjectMapper objectMapper;
public MyCloudEventConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public CloudEvent toCloudEvent(DomainEvent e) {
try {
return CloudEventBuilder.v1()
.withId(e.getEventId())
.withSource(URI.create("urn:myapplication:streamtype"))
.withType(getCloudEventType(e))
.withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC)
.truncatedTo(ChronoUnit.MILLIS))
.withSubject(e.getName())
.withDataContentType("application/json")
.withData(objectMapper.writeValueAsBytes(e))
.build();
} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}
}
@Override
public DomainEvent toDomainEvent(CloudEvent cloudEvent) {
try {
return (DomainEvent) objectMapper.readValue(cloudEvent.getData().toBytes(), Class.forName(cloudEvent.getType()));
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public String getCloudEventType(Class<? extends T> type) {
return type.getName();
}
}
To see what the attributes mean in the context of event sourcing refer to the CloudEvents documentation. You can also have a look at GenericApplicationServiceTest.java for an actual code example.
Note that if the data content type in the CloudEvent is specified as “application/json” (or a json compatible content-type) then Occurrent will automatically store it as Bson in a MongoDB event store.
The reason for this so that you’re able to query the data, either by the EventStoreQueries API, or manually using MongoDB queries.
In order to do this, the byte[]
passed to withData
, will be converted into a org.bson.Document
that is later written to the database. This is not optimal from a performance perspective.
A more performant option would be to make use of the io.cloudevents.core.data.PojoCloudEventData
class. This class implements the io.cloudevents.CloudEventData
interface and
allows passing a pre-baked Map
or org.bson.Document
instance to it. Then no additional conversion will need to take place! Here’s an example:
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import org.bson.Document;
import java.io.IOException;
import java.net.URI;
import static java.time.ZoneOffset.UTC;
import static org.occurrent.functional.CheckedFunction.unchecked;
import static org.occurrent.time.TimeConversion.toLocalDateTime;
import static java.time.temporal.ChronoUnit.MILLIS;
public class MyCloudEventConverter implements CloudEventConverter<DomainEvent> {
private final ObjectMapper objectMapper;
public MyCloudEventConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public CloudEvent toCloudEvent(DomainEvent e) {
// Convert the data in the domain event into a Document
Map<String, Object> eventData = convertDataInDomainEventToMap(e);
return CloudEventBuilder.v1()
.withId(e.getEventId())
.withSource(URI.create("http://name"))
.withType(getCloudEventType(e))
.withTime(LocalDateTime.ofInstant(e.getDate().toInstant(), UTC).atOffset(UTC)
.truncatedTo(MILLIS))
.withSubject(e.getName())
.withDataContentType("application/json")
// Use the "eventData" map to create an instance of PojoCloudEventData.
// If an event store implementation doesn't know how to handle "Map" data,
// it'll call the higher-order function that converts the map into byte[]
// (objectMapper::writeValueAsBytes), that it _has_ to understand.
// But since all Occurrent event stores currently knows how to handle maps,
// the objectMapper::writeValueAsBytes method will never be called.
.withData(PojoCloudEventData.wrap(eventData, objectMapper::writeValueAsBytes))
.build();
}
@Override
public DomainEvent toDomainEvent(CloudEvent cloudEvent) {
CloudEventData cloudEventData = cloudEvent.getData();
if (cloudEventData instanceof PojoCloudEventData && cloudEventData.getValue() instanceof Map) {
Map<String, Object> eventData = ((PojoCloudEventData<Map<String, Object>>) cloudEventData).getValue();
return convertToDomainEvent(cloudEvent, eventData);
} else {
return objectMapper.readValue(cloudEventData.toBytes(), DomainEvent.class); // try-catch omitted
}
}
@Override
public String getCloudEventType(Class<? extends T> type) {
return type.getSimpleName();
}
private static Map<String, Object> convertDataInDomainEventToDocument(DomainEvent e) {
// Convert the domain event into a Map
Map<String, Object> data = new HashMap<String, Object>();
if (e instanceof GameStarted) {
data.put("type", "GameStarted");
// Put the rest of the values
} else if (...) {
// More events
}
return map;
}
private static DomainEvent convertToDomainEvent(CloudEvent cloudEvent, Map<String, Object> data) {
// Re-construct the domain event instance from the cloud event and data
switch ((String) data.get("type")) {
case "GameStarted" -> // Convert map to GameStartedEvent
break;
...
}
}
}
Application Service
Occurrent provides a generic application service that is a good starting point for most use cases. First add the module as a dependency to your project:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>application-service-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:application-service-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "application-service-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='application-service-blocking', version='0.19.6')
[org.occurrent/application-service-blocking "0.19.6"]
'org.occurrent:application-service-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="application-service-blocking" rev="0.19.6" />
This module provides an interface, org.occurrent.application.service.blocking.ApplicationService
, and a default implementation,
org.occurrent.application.service.blocking.implementation.GenericApplicationService
. The GenericApplicationService
takes an EventStore
and
a org.occurrent.application.converter.CloudEventConverter
implementation as parameters. The latter is used to convert domain events to and from
cloud events when loaded/written to the event store. There’s a default implementation that you may decide to use called,
org.occurrent.application.converter.implementation.GenericCloudEventConverter
available in the org.occurrent:cloudevent-converter-generic
module.
You can see an example in the next section.
As of version 0.11.0, the GenericApplicationService
also takes a RetryStrategy as an optional third parameter.
By default, the retry strategy uses exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between
each retry, if a WriteConditionNotFulfilledException
is caught (see write condition docs).
It will, again by default, only retry 5 times before giving up, rethrowing the original exception. You can override the default strategy
by calling new GenericApplicationService(eventStore, cloudEventConverter, retryStrategy)
.
Use new GenericApplicationService(eventStore, cloudEventConverter, RetryStrategy.none())
to disable retry. This is also useful if you
want to use another retry library.
Using the Application Service
Now you can instantiate the (blocking) GenericApplicationService
:
- Java
- Kotlin
EventStore eventStore = ..
CloudEventConverter<DomainEvent> cloudEventConverter = ..
ApplicationService<DomainEvent> applicationService = new GenericApplicationService<>(eventStore, cloudEventConverter);
val eventStore : EventStore = ..
val cloudEventConverter : CloudEventConverter<DomainEvent> = ..
val applicationService : ApplicationService<DomainEvent> = GenericApplicationService(eventStore, cloudEventConverter)
You’re now ready to use the generic application service in your application.
As an example let’s say you have a domain model with a method defined like this:
public class WordGuessingGame {
public static Stream<DomainEvent> guessWord(Stream<DomainEvent> events, String guess) {
// Implementation
}
}
You can call it using the application service:
- Java
- Kotlin
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess));
applicationService.execute(gameId) { events ->
WordGuessingGame.guessWord(events, guess)
}
Application Service Side-Effects
The GenericApplicationService
supports executing side-effects after the events returned from the domain model have been written to the event store.
This is useful if you need to, for example, update a view synchronously after events have been written to the event store. Note that to perform side-effects (or policies)
asynchronously you should use a subscription. As an example, consider that you want to synchronously register a game as ongoing when it is started.
It may be defined like this:
- Java
- Kotlin
public class RegisterOngoingGame {
private final DatabaseApi someDatabaseApi;
pubic RegisterOngoingGame(DatabaseApi someDatabaseApi) {
this.someDatabaseApi = someDatabaseApi;
}
public void registerGameAsOngoingWhenGameWasStarted(GameWasStarted event) {
// Add the id of the game started event to a set to handle duplicates and idempotency.
someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));
}
}
class RegisterOngoingGame(private val someDatabaseApi : DatabaseApi) {
fun registerGameAsOngoingWhenGameWasStarted(event : GameWasStarted) {
// Add the id of the game started event to a set to handle duplicates and idempotency.
someDatabaseApi.addToSet("ongoingGames", Map.of("gameId", e.gameId(), "date", e.getDate()));
}
}
The reason for doing this synchronously is, for example, if you have a REST API and the player expects the ongoing games view to be updated once the “start game” command has executed. This can be achieved by other means (RSocket, Websockets, server-sent events, polling) but synchronous updates is simple and works quite well in many cases.
Now that we have the code that registers ongoing games, we can call it from our from the application service:
- Java
- Kotlin
RegisterOngoingGame registerOngoingGame = ..
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), events -> {
events.filter(event -> event instanceof GameWasStarted)
.findFirst()
.map(GameWasStarted.class::cast)
.ifPresent(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
});
val registerOngoingGame : RegisterOngoingGame = ..
applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }) { events ->
val gameWasStarted = events.find { event is GameWasStarted }
if(gameWasStarted != null) {
registerOngoingGame.registerGameAsOngoingWhenGameWasStarted(gameWasStarted)
}
}
Voila! Now registerGameAsOngoingWhenGameWasStarted
will be called after the events returned from WordGuessingGame.guessWord(..)
is written to the event store.
You can however improve on the code above and make use of the executePolicy
method shipped with the application service in the org.occurrent.application.service.blocking.PolicySideEffect
.
The code can then be refactored to this:
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess), executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted));
If using the Kotlin extension functions (org.occurrent.application.service.blocking.executePolicy
) you can write the code like this:
applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) }, executePolicy<GameWasStarted>(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted))
Policies can also be composed:
- Java
- Kotlin
RegisterOngoingGame registerOngoingGame = ..
RemoveFromOngoingGamesWhenGameEnded removeFromOngoingGamesWhenGameEnded = ..
applicationService.execute(gameId, events -> WordGuessingGame.guessWord(events, guess),
executePolicy(GameWasStarted.class, registerOngoingGame::registerGameAsOngoingWhenGameWasStarted)
.andThenExecuteAnotherPolicy(removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded));
val registerOngoingGame : RegisterOngoingGame = ..
val removeFromOngoingGamesWhenGameEnded : RemoveFromOngoingGamesWhenGameEnded = ..
applicationService.execute(gameId, { events -> WordGuessingGame.guessWord(events, guess) },
executePolicies(registerOngoingGame::registerGameAsOngoingWhenGameWasStarted,
removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded))
Application Service Transactional Side-Effects
In the example above, writing the events to the event store and executing policies is not an atomic operation. If your app crashes after the call to registerOngoingGame::registerGameAsOngoingWhenGameWasStarted
but before removeFromOngoingGamesWhenGameEnded::removeFromOngoingGamesWhenGameEnded
, you will need to handle idempotency. But if your policies/side-effects are writing data to the same database as the event store
you can make use of transactions to write everything atomically! This is very easy if you’re using a Spring EventStore. What you need to do is to wrap the ApplicationService
provided
by Occurrent in your own application service, something like this:
- Java
- Kotlin
@Service
public class CustomApplicationServiceImpl implements ApplicationService<DomainEvent> {
private final GenericApplicationService<DomainEvent> occurrentApplicationService;
public CustomApplicationService(GenericApplicationService<DomainEvent> occurrentApplicationService) {
this.occurrentApplicationService = occurrentApplicationService;
}
@Transactional
@Override
public void execute(String gameId, Function<Stream<DomainEvent>, Stream<DomainEvent>> functionThatCallsDomainModel, Consumer<Stream<DomainEvent>> sideEffect) {
occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect);
}
}
@Service
class CustomApplicationServiceImpl(val occurrentApplicationService: GenericApplicationService<DomainEvent>) : ApplicationService<DomainEvent> {
@Transactional
override fun execute(gameId : String, functionThatCallsDomainModel: Function<Stream<DomainEvent>, Stream<DomainEvent>> , sideEffect : Consumer<Stream<DomainEvent>>) {
occurrentApplicationService.execute(gameId, functionThatCallsDomainModel, sideEffect)
}
}
Given that you’ve defined a MongoTransactionManager
in your Spring Boot configuration (and using this when creating your event store instance) the side-effects and events
are written atomically in the same transaction!
Application Service Kotlin Extensions
If you’re using Kotlin chances are that your domain model is using a Sequence
instead of a java Stream
:
object WordGuessingGame {
fun guessWord(events : Sequence<DomainEvent>, guess : String) : Sequence<DomainEvent> {
// Implementation
}
}
Occurrent provides a set of extension functions for Kotlin in the application service module:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>application-service-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:application-service-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "application-service-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='application-service-blocking', version='0.19.6')
[org.occurrent/application-service-blocking "0.19.6"]
'org.occurrent:application-service-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="application-service-blocking" rev="0.19.6" />
You can then use one of the org.occurrent.application.service.blocking.execute
extension functions to do:
applicationService.execute(gameId) { events : Sequence<DomainEvent> ->
WordGuessingGame.guessWord(events, guess)
}
Sagas
A “saga” can be used to represent and coordinate a long-lived business transaction/process (where “long-lived” is kind of arbitrary). This is an advanced subject and you should try to avoid sagas if there are other means available to solve the problem (for example use policies if they are sufficient). Occurrent doesn’t provide or enforce any specific Saga implementation. But since Occurrent is a library you can hook in already existing solutions, for example:
- Temporal - Open source microservices orchestration platform for running mission critical code at any scale.
- zio-saga - If you’re using Scala and zio (there’s also a cats implementation).
- Apache Camel Saga - If you’re using Java and don’t mind bringing in Apache Camel as a dependency.
- nflow - Battle-proven solution for orchestrating business processes in Java.
- saga-coordinator-java - Saga Coordinator as a Finite State Machine (FSM) in Java.
- Baker - Baker is a library that reduces the effort to orchestrate (micro)service-based process flows.
- Use the routing-slip pattern from the Enterprise Integration Patterns book.
- Represent sagas as todo lists. This is described in the event modeling documentation in the automation section.
The way to integrate Occurrent with any of these libraries/frameworks/patterns is to create a subscription that forwards the events written to the event store to the preferred library/framework/view.
Policy
A policy (aka reaction/trigger) can be used to deal with workflows such as “whenever this happens, do that”. For example, whenever a game is won, send an email to the winner. For simple workflows like this there’s typically no need for a full-fledged saga.
Asynchronous Policy
In Occurrent, you can create asynchronous policies by creating a subscription. Let’s consider the example above:
- Java
- Kotlin
public class WhenGameWonThenSendEmailToWinnerPolicy {
private final SubscriptionModel subscriptionModel;
private final EmailClient emailClient;
private final Players players;
public WhenGameWonThenSendEmailToWinnerPolicy(SubscriptionModel subscriptionModel, EmailClient emailClient, Players players) {
this.subscriptionModel = subscriptionModel;
this.emailClient = emailClient;
this.players = players;
}
@PostConstruct
public void whenGameWonThenSendEmailToWinner() {
subscriptionModel.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")), cloudEvent -> {
String playerEmailAddress = players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()));
emailClient.sendEmail(playerEmailAddress, "You won, yaay!");
}
}
}
class WhenGameWonThenSendEmailToWinnerPolicy(val subscriptionModel : SubscriptionModel,
val emailClient : EmailClient, val players : Players) {
@PostConstruct
fun whenGameWonThenSendEmailToWinner() =
subscriptionModel.subscribe("whenGameWonThenSendEmailToWinnerPolicy", filter(type("GameWon")) { cloudEvent ->
val playerEmailAddress = players.whatIsTheEmailAddressOfPlayer(playerIdIn(cloudEvent.getData()))
emailClient.sendEmail(playerEmailAddress, "You won, yaay!")
}
}
You could also create a generic policy that simply forwards all events to another piece of infrastructure. For example, you may wish to forward all events to rabbitmq (by publishing them) or Spring’s event infrastructure, and then create policies that subscribes to events from these systems instead. There’s an example in the github repository that shows an example of how one can achieve this.
You may also want to look into the “todo-list” pattern described in the automation section on the in the event modeling website.
Synchronous Policy
In some cases, for example if you have a simple website and you want views to be updated when a command is dispatched by a REST API, it can be useful to update a policy in a synchronous fashion. The application service provided by Occurrent allows for this, please see the application service documentation for an example.
Snapshots
Snapshotting is an optimization technique that can be applied if it takes too long to derive the current state from an event stream for each command. There are several ways to do this and Occurrent doesn’t enforce any particular strategy. One strategy is to use so-called “snapshot events” (special events that contains a pre-calculated view of the state of an event stream at a particular time) and another technique is to write snapshots to another datastore than the event store.
The application service need to be modified to first load the up the snapshot and then load events that have not yet been materialized in the snapshot (if any).
Synchronous Snapshots
With Occurrent, you can trade-off write speed for understandability. For example, let’s say that you want to update the snapshot on every write and it should be consistent with the writes to the event store. One way to do this is to use Spring’s transactional support:
- Java
- Kotlin
public class ApplicationService {
private final EventStore eventStore;
private final SnapshotRepository snapshotRepository;
public ApplicationService(EventStore eventStore, SnapshotRepository snapshotRepository) {
this.eventStore = eventStore;
this.snapshotRepository = snapshotRepository;
}
@Transactional
public void execute(String streamId, BiFunction<Snapshot, Stream<CloudEvent>, Stream<CloudEvent>> functionThatCallsDomainModel) {
// Read snapshot from a the snapshot repsitory
Snapshot snapshot = snapshotRepsitory.findByStreamId(streamId);
long snapshotVersion = snapshot.version();
// Read all events for "streamId" from snapshotVersion
EventStream<CloudEvent> eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE);
// Call a pure function from the domain model which returns a Stream of new events
List<CloudEvent> newEvents = functionThatCallsDomainModel.apply(snapshot.state(), eventStream).collect(Collectors.toList());
// Convert domain events to cloud events and write them to the event store
eventStore.write(streamId, eventStream.version(), newEvents.stream());
// Update the snapshot
Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
snapshotRepsitory.save(updatedSnapshot);
}
}
class ApplicationService(val eventStore : EventStore, val snapshotRepository : SnapshotRepository) {
@Transactional
fun execute(String streamId, functionThatCallsDomainModel : (Snapshot, Stream<CloudEvent>) -> Stream<CloudEvent>) {
// Read snapshot from a the snapshot repsitory
val snapshot : Snapshot = snapshotRepsitory.findByStreamId(streamId)
long snapshotVersion = snapshot.streamVersion()
// Read all events for "streamId" from snapshotVersion
val eventStream = eventStore.read(streamId, snapshotVersion, Long.MAX_VALUE)
// Call a pure function from the domain model which returns a Stream of new events
val newEvents = functionThatCallsDomainModel(snapshot.state(), eventStream).collect(Collectors.toList())
// Convert domain events to cloud events and write them to the event store
eventStore.write(streamId, eventStream.version(), newEvents.stream())
// Update the snapshot
val updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version())
snapshotRepsitory.save(updatedSnapshot)
}
}
It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:
if (eventStream.version() - snapshot.version() >= 3) {
Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
snapshotRepsitory.save(updatedSnapshot);
}
Asynchronous Snapshots
As an alternative to synchronous and fully-consistent snapshots, you can update snapshots asynchronously. You do this by creating a subscription that updates the snapshot. For example:
- Java
- Kotlin
public class UpdateSnapshotWhenNewEventsAreWrittenToEventStore {
private final SubscriptionModel subscriptionModel;
private final SnapshotRepository snapshotRepository;
public UpdateSnapshotWhenNewEventsAreWrittenToEventStore(SubscriptionModel subscriptionModel, SnapshotRepository snapshotRepository) {
this.subscription = subscription;
this.snapshotRepository = snapshotRepository;
}
@PostConstruct
public void updateSnapshotWhenNewEventsAreWrittenToEventStore() {
subscription.subscribe("updateSnapshots", cloudEvent -> {
String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent);
long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent);
Snapshot snapshot = snapshotRepository.findByStreamId(streamId);
snapshot.updateFrom(cloudEvent, streamVersion);
snapshotRepository.save(snapshot);
}
}
}
class UpdateSnapshotWhenNewEventsAreWrittenToEventStore(val subscription : BlockingSubscription,
val snapshotRepository : SnapshotRepository) {
@PostConstruct
fun updateSnapshotWhenNewEventsAreWrittenToEventStore() {
subscription.subscribe("updateSnapshots", { cloudEvent ->
String streamId = OccurrentExtensionGetter.getStreamId(cloudEvent)
long streamVersion = OccurrentExtensionGetter.getStreamVersion(cloudEvent)
Snapshot snapshot = snapshotRepository.findByStreamId(streamId)
snapshot.updateFrom(cloudEvent, streamVersion)
snapshotRepository.save(snapshot)
}
}
}
It’s quite likely that you don’t need to update the snapshot in every write (like it’s done in the example above). If you only wish to update the snapshot for every third event then you could do:
if (streamVersion - snapshot.version() >= 3) {
Snapshot updatedSnapshot = snapshot.updateFrom(newEvents.stream(), eventStream.version());
snapshotRepsitory.save(updatedSnapshot);
}
Closing the Books
This is a pattern that can be applied instead of updating snapshots for every n
event. The idea is to try to keep event streams short and
instead create snapshots periodically. For example, once every month we run a job that creates snapshots for certain event streams. This is especially well suited for problem domains where “closing the books”
is a concept in the domain (such as accounting).
Deadlines
Deadlines (aka scheduling, alarm clock) is a very handy technique to schedule to something to be executed in the future. Imagine, for example, a multiplayer game (like word guessing game shown in previous examples), where we want to game to end automatically after 10 hours of inactivity. This means that as soon as a player has made a guess, we’d like to schedule a “timeout game command” to be executed after 10 hours.
The way it works in Occurrent is that you schedule a Deadline (org.occurrent.deadline.api.blocking.Deadline
) using a DeadlineScheduler (org.occurrent.deadline.api.blocking.DeadlineScheduler
) implementation.
The Deadline is a date/time in the future when the deadline is up. To handle the deadline, you also register a DeadlineConsumer (org.occurrent.deadline.api.blocking.DeadlineConsumer
) to a
DeadlineConsumerRegistry (org.occurrent.deadline.api.blocking.DeadlineConsumerRegistry
) implementation, and it’ll be invoked when a deadline is up. For example:
// In some method we schedule a deadline two hours from now with data "hello world"
var deadlineId = UUID.randomUUID();
var deadlineCategory = "hello-world";
var deadline = Deadline.afterHours(2);
deadlineScheduler.schedule(deadlineId, deadlineCategory, deadline, "hello world");
// In some other method, during application startup, we register a deadline consumer to the registry for the "hello-world" deadline category
deadlineConsumerRegistry.register("hello-world", (deadlineId, deadlineCategory, deadline, data) -> System.out.println(data));
In the example above, the deadline consumer will print “hello world” after 2 hours.
There are two implementations of DeadlineScheduler
and DeadlineConsumerRegistry
, JobRunr and InMemory.
JobRunr Deadline Scheduler
This is a persistent (meaning that your application can be restarted and deadlines are still around) DeadlineScheduler based on JobRunr. To get started, depend on:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>deadline-jobrunr</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:deadline-jobrunr:0.19.6'
libraryDependencies += "org.occurrent" % "deadline-jobrunr" % "0.19.6"
@Grab(group='org.occurrent', module='deadline-jobrunr', version='0.19.6')
[org.occurrent/deadline-jobrunr "0.19.6"]
'org.occurrent:deadline-jobrunr:jar:0.19.6'
<dependency org="org.occurrent" name="deadline-jobrunr" rev="0.19.6" />
You then need to create an instance of org.jobrunr.scheduling.JobRequestScheduler
(see JobRunr documentation for different ways of doing this).
Once you have a JobRequestScheduler
you need to create an instance of org.occurrent.deadline.jobrunr.JobRunrDeadlineScheduler
:
JobRequestScheduler jobRequestScheduler = ..
DeadlineScheduler deadlineScheduler = new JobRunrDeadlineScheduler(jobRequestScheduler);
You also need to create an instance of org.occurrent.deadline.jobrunr.JobRunrDeadlineConsumerRegistry
:
DeadlineConsumerRegistry deadlineConsumerRegistry = new JobRunrDeadlineConsumerRegistry();
You register so-called “deadline consumers” to the DeadlineConsumerRegistry for a certain “category” (see example above). A deadline consumer will be invoked once a deadline is up.
Note that you can only have one deadline consumer instance per category. You want to register your deadline consumer everytime your application starts up. If you’re using Spring, you can, for example, do this
using a @PostConstructor
method:
@PostConstruct
void registerDeadlineConsumersOnApplicationStart() {
deadlineConsumerRegistry.register("CancelPayment", (id, category, deadline, data) -> {
CancelPayment cancelPayment = (CancelPayment) data;
paymentApi.cancel(cancelPayment.getPaymentId());
}):
}
The example above will register a deadline consumer for the “CancelPayment” category and call an imaginary api (paymentApi
) to cancel the payment. The deadline consumer will be called by Occurrent when the scheduled deadline is up. Here’s an example of you can schedule this deadline using
the JobRunrDeadlineConsumerRegistry
, but first lets see what CancelPayment
looks like:
public class CancelPayment {
private String paymentId;
CancelPayment() {
}
public CancelPayment(String paymentId) {
this.paymentId = paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public String getPaymentId() {
return paymentId;
}
}
As you can see it’s a regular Java POJO. This is very important since JobRunr needs to serialize/de-serialize this class to the database. Typically, this is done using Jackson (so it’s fine to use Jackson annotations etc), but JobRunr has support for other mappers as well.
Now lets see how we can schedule a “CancelPayment”:
String paymentId = ...
deadlineScheduler.schedule(UUID.randomUUID(), "CancelPayment", Deadline.afterWeeks(2), new CancelPayment(paymentId));
This will schedule a deadline after 2 weeks, that’ll be picked-up by the deadline consumer registered in the registerDeadlineConsumersOnApplicationStart
method. Note that in this case, the class (CancelPayment
) has the same name as the category, but this is not required.
Here’s an example of you can setup Occurrent Deadline Scheduling in Spring Boot:
@Configuration
public class DeadlineSpringConfig {
@Bean
JobRunrConfigurationResult initJobRunr(ApplicationContext applicationContext, MongoClient mongoClient,
@Value("${spring.data.mongodb.uri}") String mongoUri) {
var connectionString = new ConnectionString(mongoUri);
var database = connectionString.getDatabase();
return JobRunr.configure()
.useJobActivator(applicationContext::getBean)
.useStorageProvider(new MongoDBStorageProvider(mongoClient, database, "jobrunr-"))
.useBackgroundJobServer()
.useDashboard(
JobRunrDashboardWebServerConfiguration
.usingStandardDashboardConfiguration()
.andPort(8082)
.andAllowAnonymousDataUsage(false)
)
.initialize();
}
@PostConstruct
void destroyJobRunnerOnShutdown() {
JobRunr.destroy();
}
@Bean
DeadlineScheduler deadlineScheduler(JobRunrConfigurationResult jobRunrConfigurationResult) {
return new JobRunrDeadlineScheduler(jobRunrConfigurationResult.getJobRequestScheduler());
}
@Bean
DeadlineConsumerRegistry deadlineConsumerRegistry() {
return new JobRunrDeadlineConsumerRegistry();
}
}
Have a look at JobRunr for more configuration options.
In-Memory Deadline Scheduler
This is an in-memory, non-persistent (meaning that scheduled deadlines will be lost on application restart), DeadlineScheduler. To get started, depend on:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>deadline-inmemory</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:deadline-inmemory:0.19.6'
libraryDependencies += "org.occurrent" % "deadline-inmemory" % "0.19.6"
@Grab(group='org.occurrent', module='deadline-inmemory', version='0.19.6')
[org.occurrent/deadline-inmemory "0.19.6"]
'org.occurrent:deadline-inmemory:jar:0.19.6'
<dependency org="org.occurrent" name="deadline-inmemory" rev="0.19.6" />
Next, you need to create in instance of org.occurrent.deadline.inmemory.InMemoryDeadlineScheduler
and org.occurrent.deadline.inmemory.InMemoryDeadlineConsumerRegistry
. In order for these two components to communicate with each other,
you also need to provide an instance of a e java.util.concurrent.BlockingDeque
to the constructor. Here’s an example:
BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
DeadlineConsumerRegistry deadlineConsumerRegistry = new InMemoryDeadlineConsumerRegistry(queue);
DeadlineConsumerRegistry deadlineScheduler = new InMemoryDeadlineScheduler(queue);
You can configure things, such as poll interval and retry strategy, for InMemoryDeadlineConsumerRegistry
by supplying an instance of org.occurrent.deadline.inmemory.InMemoryDeadlineConsumerRegistry$Config
as the second constructor argument:
new InMemoryDeadlineConsumerRegistry(queue, new Config().pollIntervalMillis(300).retryStrategy(RetryStrategy.fixed(Duration.of(2, SECONDS))));
Note that it’s very important to call shutdown
on both InMemoryDeadlineConsumerRegistry
and InMemoryDeadlineScheduler
on application/test end.
For usage examples, see Deadlines and JobRunr Scheduler.
Other Ways of Expressing Deadlines
If you don’t want to use any of the Occurrent libraries for deadline scheduling, or if you’re looking for more features that are not (yet) available, you can use other libraries from the Java ecosystem, such as:
- JobRunr - An easy way to perform background processing in Java. Distributed and backed by persistent storage.
- Quartz - Can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs.
- db-scheduler - Task-scheduler for Java that was inspired by the need for a clustered
java.util.concurrent.ScheduledExecutorService
simpler than Quartz. - Spring Scheduling - Worth looking into if you’re already using Spring.
Getting started
Getting started with Occurrent involves these steps:
- Choose an underlying datastore for an event store. Luckily there are only two choices at the moment, MongoDB and an in-memory implementation. Hopefully this will be a more difficult decision in the future :)
- Once a datastore has been decided, it’s time to choose an EventStore implementation for this datastore since there may be more than one.
- If you need subscriptions (i.e. the ability to subscribe to changes from an EventStore) then you need to pick a library that implements this for the datastore that you’ve chosen. Again, there may be several implementations to choose from.
- If a subscriber needs to be able to continue from where it left off on application restart, it’s worth looking into a so called position storage library. These libraries provide means to automatically (or selectively) store the position for a subscriber to a datastore. Note that the datastore that stores this position can be a different datastore than the one used as EventStore. For example, you can use MongoDB as EventStore but store subscription positions in Redis.
- You’re now good to go, but you may also want to look into more higher-level components if you don’t have the need to role your own. We recommend looking into:
Choosing An EventStore
There are currently two different datastores to choose from, MongoDB and In-Memory.
MongoDB
Uses MongoDB, version 4.2 or above, as the underlying datastore for the CloudEvents. All implementations use transactions to guarantee consistent writes (see WriteCondition). Each EventStore will automatically create a few indexes on startup to allow for fast consistent writes, optimistic concurrency control and to avoid duplicated events. These indexes can also be used in queries against the EventStore (see EventStoreQueries).
There are three different MongoDB EventStore implementations to choose from:
MongoDB Schema
All MongoDB EventStore implementations tries to stay as close as possible to the CloudEvent’s specification even in the persitence layer.
Occurrent, by default, automatically adds a custom “Occurrent extension” to each cloud event that is written to an EventStore
.
The Occurrent CloudEvent Extension consists of these attributes:
Attribute Name | Type | Description |
---|---|---|
streamid |
String | An id that uniquely identifies a particular event stream. It’s used to determine which events belong to which stream. |
streamversion |
Long | The id of the stream version for a particular event. It’s used for optimistic concurrency control. |
A json schema describing a complete Occurrent CloudEvent, as it will be persisted to a MongoDB collection, can be found here (a “raw” cloud event json schema can be found here for comparison).
Note that MongoDB will automatically add an _id field (which is not used by Occurrent).
The reason why the CloudEvent id
attribute is not stored as _id
in MongoDB is that the id
of a CloudEvent is not globally unique!
The combination of id
and source
is a globally unique CloudEvent. Note also that _id
will not be included when the CloudEvent
is read from an EventStore
.
Here’s an example of what you can expect to see in the “events” collection when storing events in an EventStore
backed by MongoDB
(given that TimeRepresentation
is set to DATE
):
{
"_id : ObjectId("5f4112a348b8da5305e41f57"),
"specversion" : "1.0",
"id" : "bdb8481f-9e8e-443b-80a4-5ef787f0f227",
"source" : "urn:occurrent:domain:numberguessinggame",
"type" : "NumberGuessingGameWasStarted",
"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
"time" : ISODate("2020-08-22T14:42:11.712Z"),
"data" : {
"secretNumberToGuess" : 8,
"startedBy" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf",
"maxNumberOfGuesses" : 5
},
"streamid" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
"streamversion" : NumberLong(1)
}
{
"_id" : ObjectId("5f4112a548b8da5305e41f58"),
"specversion" : "1.0",
"id" : "c1bfc3a5-1716-43ae-88a6-297189b1b5c7",
"source" : "urn:occurrent:domain:numberguessinggame",
"type" : "PlayerGuessedANumberThatWasTooSmall",
"subject" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
"time" : ISODate("2020-08-22T14:42:13.336Z"),
"data" : {
"guessedNumber" : 1,
"playerId" : "003ab97b-df79-4bf1-8c0c-08a5dd3701cf"
},
"streamid" : "a1fc6ba1-7cd4-45cf-8dcc-b357fe23956d",
"streamversion" : NumberLong(2)
}
MongoDB Time Representation
The CloudEvents specification says that the time attribute, if present, must adhere to the RFC 3339 specification.
To accommodate this in MongoDB, the time
attribute must be persisted as a String
. This by itself is not a problem, a problem only arise
if you want to make time-based queries on the events persisted to a MongoDB-backed EventStore
(using the EventStoreQueries interface).
This is, quite obviously, because time-based queries on String
’s are suboptimal (to say the least) and may lead to surprising results.
What we would like to do is to persist the time
attribute as a Date
in MongoDB, but MongoDB internally represents a Date with only millisecond resolution
(see here) and then the CloudEvent cannot be compliant with the RFC 3339 specification in all circumstances.
Because of the reasons described above, users of a MongoDB-backed EventStore implementation, must decide how the time
attribute is to be represented in MongoDB
when instantiating an EventStore implementation. This is done by passing a value from the org.occurrent.mongodb.timerepresentation.TimeRepresentation
enum to an
EventStoreConfig
object that is then passed to the EventStore implementation. TimeRepresentation
has these values:
Value | Description |
---|---|
RFC_3339_STRING |
Persist the time attribute as an RFC 3339 string. This string is able to represent both nanoseconds and a timezone so this is recommended for apps that need to store this information or if you are uncertain of whether this is required in the future. |
DATE |
Persist the time attribute as a MongoDB Date. The benefit of using this approach is that you can do range queries etc on the “time” field on the cloud event. This can be really useful for certain types on analytics or projections (such as show the 10 latest number of started games) without writing any custom code. |
Note that if you choose to go with RFC_3339_STRING
you always have the option of adding a custom attribute, named for example “date”, in which you represent the “time” attribute as a Date
when writing the events to an EventStore
.
This way you have the ability to use the “time” attribute to re-construct the CloudEvent time attribute exactly as well as the ability to do custom time-queries on the “date” attribute. However, you cannot use the methods involving time-based queries when using the EventStoreQueries interface.
Important: There’s yet another option! If you don’t need nanotime precision (i.e you’re fine with millisecond precision) and you’re OK with always representing the “time” attribute in UTC, then you can use
TimeRepresentation.DATE
without loss of precision! This is why, if DATE
is configured for the EventStore
, Occurrent will refuse to store a CloudEvent
that specifies nanotime
and is not defined in UTC (so that there won’t be any surprises). I.e. using DATE
and then doing this will throw an IllegalArgumentException
:
var cloudEvent = new CloudEventBuilder().time(OffsetDateTime.now()). .. .build();
// Will throw exception since OffsetDateTime.now() will include nanoseconds by default in Java 9+
eventStore.write(Stream.of(cloudEvent));
Instead, you need to remove nano seconds do like this explicitly:
// Remove millis and make sure to use UTC as timezone
var now = OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS).withOffsetSameInstant(ZoneOffset.UTC);
var cloudEvent = new CloudEventBuilder().time(now). .. .build();
// Now you can write the cloud event
For more thoughts on this, refer to the architecture decision record on time representation in MongoDB.
MongoDB Indexes
Each MongoDB EventStore
implementation creates a few indexes for the “events collection” the first time they’re instantiated. These are:
Name | Properties | Description |
---|---|---|
id + source |
ascending id ,descending source , unique |
Compound index of id and source to comply with the specification that the id +source combination must be unique. |
streamid + streamversion |
ascending streamid ,descending streamversion ,unique |
Compound index of streamid and streamversion (Occurrent CloudEvent extension) used for fast retrieval of the latest cloud event in a stream. |
streamid
index was also automatically created, but it was removed in 0.7.3 since this index is covered by the streamid+streamversion
index.To allow for fast queries, for example when using EventStoreQueries, it’s recommended to create additional indexes tailored to the querying behavior of your application. See MongoDB indexes for more information on how to do this. If you have many adhoc queries it’s also worth checking out wildcard indexes which is a new feature in MongoDB 4.2. These allow you to create indexes that allow for arbitrary queries on e.g. the data attribute of a cloud event (if data is stored in json/bson format).
MongoDB EventStore Implementations
There are three different MongoDB EventStore implementations to choose from:
EventStore with MongoDB Native Driver
What is it?
An EventStore implementation that uses the “native” Java MongoDB synchronous driver (see website) to read and write CloudEvent’s to MongoDB.
When to use?
Use when you don’t need Spring support and want to use MongoDB as the underlying datastore.
Dependencies
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-mongodb-native</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-native:0.19.6'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-native" % "0.19.6"
@Grab(group='org.occurrent', module='eventstore-mongodb-native', version='0.19.6')
[org.occurrent/eventstore-mongodb-native "0.19.6"]
'org.occurrent:eventstore-mongodb-native:jar:0.19.6'
<dependency org="org.occurrent" name="eventstore-mongodb-native" rev="0.19.6" />
Getting Started
Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.nativedriver.MongoEventStore
.
It takes four arguments, a MongoClient,
the “database” and “event collection “that the EventStore will use to store events as well as an org.occurrent.eventstore.mongodb.nativedriver.EventStoreConfig
.
For example:
- Java
- Kotlin
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
EventStoreConfig config = new EventStoreConfig(TimeRepresentation.RFC_3339_STRING);
String mongoDatabase = "database";
String mongoEventCollection = "events";
MongoEventStore eventStore = new MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
val config = EventStoreConfig(TimeRepresentation.RFC_3339_STRING)
val mongoDatabase = "database"
val mongoEventCollection = "events"
val eventStore = MongoEventStore(mongoClient, mongoDatabase, mongoEventCollection, config)
Now you can start reading and writing events to the EventStore:
- Java
- Kotlin
CloudEvent event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
// Write
eventStore.write("streamId", Stream.of(event));
// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".toByteArray())
.build();
// Write
eventStore.write("streamId", Stream.of(event))
// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")
Examples
Name | Description |
---|---|
Number Guessing Game | A simple game implemented using a pure domain model and stores events in MongoDB using MongoEventStore . It also generates integration events and publishes these to RabbitMQ. |
Uno | A port of FsUno, a classic card game. Stores events in MongoDB using MongoEventStore . |
EventStore with Spring MongoTemplate (Blocking)
What is it?
An implementation that uses Spring’s MongoTemplate to read and write events to/from MongoDB.
When to use?
If you’re already using Spring and you don’t need reactive support then this is a good choice. You can make use of the @Transactional
annotation to write events and views in the same transaction (but make sure you understand what you’re going before attempting this).
Dependencies
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-mongodb-spring-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-blocking', version='0.19.6')
[org.occurrent/eventstore-mongodb-spring-blocking "0.19.6"]
'org.occurrent:eventstore-mongodb-spring-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-blocking" rev="0.19.6" />
Getting Started
Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringMongoEventStore
.
It takes two arguments, a MongoTemplate and
an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig
.
For example:
- Java
- Kotlin
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
// The collection where all events will be stored
.eventStoreCollectionName("events")
.transactionConfig(mongoTransactionManager)
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
.timeRepresentation(TimeRepresentation.RFC_3339_STRING)
.build();
SpringMongoEventStore eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))
val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
// The collection where all events will be stored
.eventStoreCollectionName("events")
.transactionConfig(mongoTransactionManager)
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
.timeRepresentation(TimeRepresentation.RFC_3339_STRING)
.build()
val eventStore = SpringMongoEventStore(mongoTemplate, eventStoreConfig)
Now you can start reading and writing events to the EventStore:
- Java
- Kotlin
CloudEvent event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
// Write
eventStore.write("streamId", Stream.of(event));
// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".toByteArray())
.build();
// Write
eventStore.write("streamId", Stream.of(event))
// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")
Examples
Name | Description |
---|---|
Number Guessing Game | A simple game implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot. It also generates integration events and publishes these to RabbitMQ. |
Word Guessing Game | Similar to the “Number Guessing Game” but more advanced, leveraging several Occurrent features such as CQRS, queries, and transactional projections. Implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot. |
Uno | A port of FsUno, a classic card game. Implemented using a pure domain model and stores events in MongoDB using SpringMongoEventStore and Spring Boot. |
Subscription View | An example showing how to create a subscription that listens to certain events stored in the EventStore and updates a view/projection from these events. |
Transactional View | An example showing how to combine writing events to the SpringMongoEventStore and update a view transactionally using the @Transactional annotation. |
Custom Aggregation View | Example demonstrating that you can query the SpringMongoEventStore using custom MongoDB aggregations. |
EventStore with Spring ReactiveMongoTemplate (Reactive)
What is it?
An implementation that uses Spring’s ReactiveMongoTemplate to read and write events to/from MongoDB.
When to use?
If you’re already using Spring and want to use the reactive driver (project reactor) then this is a good choice. It uses the ReactiveMongoTemplate
to write events to MongoDB. You can make use of the @Transactional
annotation to write events and views in the same tx (but make sure that you understand what you’re going before attempting this).
Dependencies
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-mongodb-spring-reactor</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:eventstore-mongodb-spring-reactor:0.19.6'
libraryDependencies += "org.occurrent" % "eventstore-mongodb-spring-reactor" % "0.19.6"
@Grab(group='org.occurrent', module='eventstore-mongodb-spring-reactor', version='0.19.6')
[org.occurrent/eventstore-mongodb-spring-reactor "0.19.6"]
'org.occurrent:eventstore-mongodb-spring-reactor:jar:0.19.6'
<dependency org="org.occurrent" name="eventstore-mongodb-spring-reactor" rev="0.19.6" />
Getting Started
Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.mongodb.spring.blocking.SpringMongoEventStore
.
It takes two arguments, a MongoTemplate and
an org.occurrent.eventstore.mongodb.spring.blocking.EventStoreConfig
.
For example:
- Java
- Kotlin
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(new SimpleMongoClientDatabaseFactory(mongoClient, "database"));
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, "database");
EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder()
// The collection where all events will be stored
.eventStoreCollectionName("events")
.transactionConfig(mongoTransactionManager)
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
.timeRepresentation(TimeRepresentation.RFC_3339_STRING)
.build();
ReactorMongoEventStore eventStore = new ReactorMongoEventStore(mongoTemplate, eventStoreConfig);
val mongoClient = MongoClients.create("mongodb://localhost:27017")
val mongoTransactionManager = MongoTransactionManager(SimpleMongoClientDatabaseFactory(mongoClient, "database"))
val mongoTemplate = MongoTemplate(mongoClient, "database")
val eventStoreConfig = EventStoreConfig.Builder()
// The collection where all events will be stored
.eventStoreCollectionName("events")
.transactionConfig(mongoTransactionManager)
// How the CloudEvent "time" property will be serialized in MongoDB! !!Important!!
.timeRepresentation(TimeRepresentation.RFC_3339_STRING)
.build()
val eventStore = ReactorMongoEventStore(mongoTemplate, eventStoreConfig)
Now you can start reading and writing events to the EventStore:
- Java
- Kotlin
CloudEvent event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
// Write
Mono<Void> mono = eventStore.write("streamId", Flux.just(event));
// Read
Mono<EventStream<CloudEvent>> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".toByteArray())
.build();
// Write
val mono = eventStore.write("streamId", Flux.just(event))
// Read
val eventStream : Mono<EventStream<CloudEvent>> = eventStore.read("streamId")
Examples
Name | Description |
---|---|
Custom Aggregation View | Example demonstrating that you can query the SpringMongoEventStore using custom MongoDB aggregations. |
In-Memory EventStore
What is it?
A simple in-memory implementation of the EventStore
interface.
When to use?
Mainly for testing purposes or for integration tests that doesn’t require a durable event store.
Dependencies
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-inmemory</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:eventstore-inmemory:0.19.6'
libraryDependencies += "org.occurrent" % "eventstore-inmemory" % "0.19.6"
@Grab(group='org.occurrent', module='eventstore-inmemory', version='0.19.6')
[org.occurrent/eventstore-inmemory "0.19.6"]
'org.occurrent:eventstore-inmemory:jar:0.19.6'
<dependency org="org.occurrent" name="eventstore-inmemory" rev="0.19.6" />
Getting Started
Once you’ve imported the dependencies you create a new instance of org.occurrent.eventstore.inmemory.InMemoryEventStore
. For example:
- Java
- Kotlin
InMemoryEventStore eventStore = new InMemoryEventStore();
val eventStore = InMemoryEventStore()
Now you can start reading and writing events to the EventStore:
- Java
- Kotlin
CloudEvent event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".getBytes(StandardCharsets.UTF_8))
.build();
// Write
eventStore.write("streamId", Stream.of(event));
// Read
EventStream<CloudEvent> eventStream = eventStore.read("streamId");
val event = CloudEventBuilder.v1()
.withId("eventId")
.withSource(URI.create("urn:mydomain"))
.withType("HelloWorld")
.withTime(LocalDateTime.now().atOffset(ZoneOffset.UTC))
.withSubject("subject")
.withDataContentType("application/json")
.withData("{ \"message\" : \"hello\" }".toByteArray())
.build();
// Write
eventStore.write("streamId", Stream.of(event))
// Read
val eventStream : EventStream<CloudEvent> = eventStore.read("streamId")
Using Subscriptions
There a two different kinds of subscriptions, blocking subscriptions and reactive subscriptions. For blocking subscription implementations see here and for reactive subscription implementations see here.
Blocking Subscriptions
A “blocking subscription” is a subscription that uses the normal Java threading mechanism for IO operations, i.e. reading changes from an EventStore will block the thread. This is arguably the easiest and most familiar way to use subscriptions for the typical Java developer, and it’s probably good-enough for most scenarios. If high throughput, low CPU and memory-consumption is critical then consider using reactive subscription instead. Reactive subscriptions are also better suited if you want to work with streaming data.
To create a blocking subscription, you first need to choose which “subscription model” to use. Then you create a subscription instance from this subscription model.
All blocking subscriptions implements the org.occurrent.subscription.api.blocking.SubscriptionModel
interface. This interface provide means to subscribe to new events from an EventStore
as they are written. For example:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId", System.out::println);
subscriptionModel.subscribe("mySubscriptionId", ::println)
This will simply print each cloud event written to the event store to the console.
Note that the signature of subscribe
is defined like this:
public interface SubscriptionModel {
/**
* Start listening to cloud events persisted to the event store using the supplied start position and <code>filter</code>.
*
* @param subscriptionId The id of the subscription, must be unique!
* @param filter The filter used to limit which events that are of interest from the EventStore.
* @param startAt The position to start the subscription from
* @param action This action will be invoked for each cloud event that is stored in the EventStore.
*/
Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action);
// Default methods
}
It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent
type that includes
the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position
for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a SubscriptionModel
implementation that also implement the
org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel
interface. The PositionAwareSubscriptionModel
is an example of a SubscriptionModel
that returns a wrapper around
io.cloudevents.CloudEvent
called org.occurrent.subscription.PositionAwareCloudEvent
which adds an additional method, SubscriptionPosition getStreamPosition()
, that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent)
and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent)
. Note that PositionAwareCloudEvent
is fully compatible with io.cloudevents.CloudEvent
and it’s ok to treat it as such. So given that
you’re subscribing from a PositionAwareSubscriptionModel
, you are responsible for keeping track of the subscription position, so
that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”,
by calling the globalSubscriptionPosition
method which can be useful when starting a new subscription.
For example, consider the case when subscription “A” starts
subscribing at the current time (T1). Event E1 is written to the EventStore
and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it
from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since
it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription
at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage
before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.
Blocking Subscription Filters
You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")), System.out::println);
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")), ::println)
This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console.
The filter
method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter
and type
is statically imported from org.occurrent.condition.Condition
.
The OccurrentSubscriptionFilter
is generic and should be applicable to a wide variety of different datastores. However, subscription implementations
may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), System.out::println);
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")), ::println)
Now filter
is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification
and Filters
is imported from
com.mongodb.client.model.Filters
(i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter
and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter
.
Blocking Subscription Start Position
A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a
org.occurrent.subscription.StartAt
instance. It provides several ways to specify the start position, either by using StartAt.now()
, StartAt.subscriptionModelDefault()
(default if StartAt
is not defined when
calling the subscribe
function), or StartAt.subscriptionPosition(<subscriptionPosition>)
, where <subscriptionPosition>
is a datastore-specific
implementation of the org.occurrent.subscription.SubscriptionPosition
interface which provides the start position as a String
. You may want to store the
String
returned by a SubscriptionPosition
in a database so that it’s possible to resume a subscription from the last processed position on application restart.
You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage
available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.
Blocking Subscription Position Storage
It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position
provided by a blocking subscription any way you like, Occurrent provides an interface
called org.occurrent.subscription.api.blocking.SubscriptionPositionStorage
acts as a uniform abstraction for this purpose. A SubscriptionPositionStorage
is defined like this:
public interface SubscriptionPositionStorage {
SubscriptionPosition read(String subscriptionId);
SubscriptionPosition save(String subscriptionId, SubscriptionPosition subscriptionPosition);
void delete(String subscriptionId);
}
I.e. it’s a way to read/write/delete the SubscriptionPosition
for a given subscription. Occurrent ships with three pre-defined implementations:
1. NativeMongoSubscriptionPositionStorage
Uses the vanilla MongoDB Java (sync) driver to store SubscriptionPosition
’s in MongoDB.
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-native-blocking-position-storage</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-position-storage:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-position-storage" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-position-storage', version='0.19.6')
[org.occurrent/subscription-mongodb-native-blocking-position-storage "0.19.6"]
'org.occurrent:subscription-mongodb-native-blocking-position-storage:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-position-storage" rev="0.19.6" />
2. SpringMongoSubscriptionPositionStorage
Uses the Spring MongoTemplate to store SubscriptionPosition
’s in MongoDB.
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-spring-blocking-position-storage</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-position-storage:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-position-storage" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-position-storage', version='0.19.6')
[org.occurrent/subscription-mongodb-spring-blocking-position-storage "0.19.6"]
'org.occurrent:subscription-mongodb-spring-blocking-position-storage:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-position-storage" rev="0.19.6" />
3. SpringRedisSubscriptionPositionStorage
Uses the Spring RedisTemplate to store SubscriptionPosition
’s in Redis.
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-redis-spring-blocking-position-storage</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-redis-spring-blocking-position-storage:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-redis-spring-blocking-position-storage" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-redis-spring-blocking-position-storage', version='0.19.6')
[org.occurrent/subscription-redis-spring-blocking-position-storage "0.19.6"]
'org.occurrent:subscription-redis-spring-blocking-position-storage:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-redis-spring-blocking-position-storage" rev="0.19.6" />
If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “blocking subscription API” which contains the SubscriptionPositionStorage
interface:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-api-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-api-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-api-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-api-blocking', version='0.19.6')
[org.occurrent/subscription-api-blocking "0.19.6"]
'org.occurrent:subscription-api-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-api-blocking" rev="0.19.6" />
Blocking Subscription Implementations
These are the non-durable blocking subscription implementations:
MongoDB
- Blocking subscription using the “native” Java MongoDB driver
- Blocking subscription using Spring MongoTemplate
In-Memory
By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required
for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the
org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel
interface (since these types of subscriptions needs to be aware of the position).
Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like,
but for most cases you probably want to look into implementations of org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel
.
These subscriptions can be combined with a subscription position storage implementation to store the position in a durable
datastore.
Occurrent provides a utility that combines a PositionAwareSubscriptionModel
and
a SubscriptionPositionStorage
(see here) to automatically store the subscription position
after each processed event. You can tweak how often the position should be persisted in the configuration.
Blocking Subscription using the “Native” Java MongoDB Driver
Uses the vanilla Java MongoDB synchronous driver (no Spring dependency is required).
To get started first include the following dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-native-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking', version='0.19.6')
[org.occurrent/subscription-mongodb-native-blocking "0.19.6"]
'org.occurrent:subscription-mongodb-native-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking" rev="0.19.6" />
Then create a new instance of NativeMongoSubscriptionModel
and start subscribing:
- Java
- Kotlin
MongoDatabase database = mongoClient.getDatabase("some-database");
// Create the blocking subscription
SubscriptionModel subscriptionModel = new NativeMongoSubscriptionModel(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.retry().fixed(200));
// Now you can create subscription instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent));
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val database = mongoClient.getDatabase("some-database")
// Create the blocking subscription
val subscriptionModel = NativeMongoSubscriptionModel(database, "eventCollection", TimeRepresentation.DATE, Executors.newCachedThreadPool(), RetryStrategy.retry().fixed(200))
// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
There are a few things to note here that needs explaining. First we have the TimeRepresentation.DATE
that is passed as the third constructor argument which you can read more about
here. Secondly we have the Executors.newCachedThreadPool()
. A thread will be created from this executor for each call to
“subscribe” (i.e. for each subscription). Make sure that you have enough threads to cover all your subscriptions or the “SubscriptionModel” may hang.
Last we have the RetryStrategy which defines what should happen if there’s e.g. a connection issue during the life-time of a subscription or if subscription fails to process a cloud event
(i.e. the action
throws an exception).
Note that you can provide a filter, start position and position persistence for this subscription implementation.
Blocking Subscription using Spring MongoTemplate
An implementation that uses Spring’s MongoTemplate for event subscriptions.
First include the following dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-spring-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking', version='0.19.6')
[org.occurrent/subscription-mongodb-spring-blocking "0.19.6"]
'org.occurrent:subscription-mongodb-spring-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking" rev="0.19.6" />
Then create a new instance of SpringMongoSubscriptionModel
and start subscribing:
- Java
- Kotlin
MongoTemplate mongoTemplate = ...
// Create the blocking subscription
SubscriptionModel subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);
// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to
// the (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct)
subscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent));
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val mongoTemplate : MongoTemplate = ...
// Create the blocking subscription
val subscriptionModel = SpringMongoSubscriptionModel(mongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)
// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection
used by the EventStore
implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING
that is passed as the third constructor argument, which you can read more about
here. It’s also very important that this is configured the same way as the EventStore
.
It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the MongoTemplate
instance.
When it comes to retries, if the “action” fails (i.e. if the higher-order function you provide when calling subscribe
throws an exception), either using something like Spring Retry
or the Occurrent Retry Module. By default, all subscription models will use the Occurrent retry module with exponential backoff starting with 100 ms and progressively
go up to max 2 seconds wait time between each retry when reading/saving/deleting the subscription position. You can customize this by passing an instance of RetryStrategy
to the SpringMongoSubscriptionModel
constructor.
If you want to disable the Occurrent retry module, pass RetryStrategy.none()
to the SpringMongoSubscriptionModel
constructor and then handle retries anyway you find fit. For example, let’s say you want to use spring-retry
, and you have a simple Spring bean that writes each cloud event to a repository:
@Component
public class WriteToRepository {
private final SomeRepository someRepository;
public WriteToRepository(SomeRepository someRepository) {
this.someRepository = someRepository;
}
public void write(CloudEvent cloudEvent) {
someRepository.persist(cloudEvent);
}
}
And you want to subscribe to all “GameStarted” events and write them to the repository:
WriteToRepository writeToRepository = ...
subscriptionModel.subscribe("gameStartedLog", writeToRepository::write);
But if the connection to someRepository
is flaky you can add Spring Retry so allow for retry with exponential backoff:
@Component
public class WriteToRepository {
private final SomeRepository someRepository;
public WriteToRepository(SomeRepository someRepository) {
this.someRepository = someRepository;
}
@Retryable(backoff = @Backoff(delay = 200, multiplier = 2, maxDelay = 30000))
public void write(CloudEvent cloudEvent) {
someRepository.persist(cloudEvent);
}
}
Don’t forget to add @EnableRetry
in to your Spring Boot application as well.
Note that you can provide a filter, start position and position persistence for this subscription implementation.
Restart Subscription when Oplog Lost
If there’s not enough history available in the MongoDB oplog to resume a subscription created from a SpringMongoSubscriptionModel
, you can configure it to restart the subscription from the current
time automatically. This is only of concern when an application is restarted, and the subscriptions are configured to start from a position in the oplog that is no longer available. It’s disabled by default since it might not
be 100% safe (meaning that you can miss some events when the subscription is restarted). It’s not 100% safe if you run subscriptions in a different process than the event store and you have lot’s of
writes happening to the event store. It’s safe if you run the subscription in the same process as the writes to the event store if you make sure that the
subscription is started before you accept writes to the event store on startup. To enable automatic restart, you can do like this:
var subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, SpringSubscriptionModelConfig.withConfig("events", TimeRepresentation.RFC_3339_STRING).restartSubscriptionsOnChangeStreamHistoryLost(true));
An alternative approach to restarting automatically is to use a catch-up subscription and restart the subscription from an earlier date.
InMemory Subscription
If you’re using the InMemory EventStore you can use the “InMemorySubscriptionModel” to subscribe to new events. For add the dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-inmemory</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-inmemory:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-inmemory" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-inmemory', version='0.19.6')
[org.occurrent/subscription-inmemory "0.19.6"]
'org.occurrent:subscription-inmemory:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-inmemory" rev="0.19.6" />
Then you can use it like this:
- Java
- Kotlin
InMemorySubscriptionModel subscriptionModel = new InMemorySubscriptionModel();
InMemoryEventStore inMemoryEventStore = new InMemoryEventStore(inMemorySubscriptionModel);
subscriptionModel.subscribe("subscription1", System.out::println);
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val subscriptionModel = new InMemorySubscriptionModel()
val inMemoryEventStore = new InMemoryEventStore(inMemorySubscriptionModel)
subscriptionModel.subscribe("subscription1", ::println)
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
Durable Subscriptions (Blocking)
Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application.
Occurrent provides a utility that implements SubscriptionModel
and combines a PositionAwareSubscriptionModel
and a SubscriptionPositionStorage
implementation
(see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate
to DurableSubscriptionModelConfig
. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN
, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new DurableSubscriptionModelConfig(3)
that
creates an instance of EveryN
that stores the position for every third event.
If you want full control, it’s recommended to pick a subscription position storage implementation, and store the position yourself using its API.
To use it, first we need to add the dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>durable-subscription</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:durable-subscription:0.19.6'
libraryDependencies += "org.occurrent" % "durable-subscription" % "0.19.6"
@Grab(group='org.occurrent', module='durable-subscription', version='0.19.6')
[org.occurrent/durable-subscription "0.19.6"]
'org.occurrent:durable-subscription:jar:0.19.6'
<dependency org="org.occurrent" name="durable-subscription" rev="0.19.6" />
Then we should instantiate a PositionAwareSubscriptionModel
, that subscribes to the events from the event store, and an instance of a SubscriptionPositionStorage
,
that stores the subscription position, and combine them to a DurableSubscriptionModel
:
- Java
- Kotlin
// Create the non-durable blocking subscription instance
PositionAwareBlockingSubscription nonDurableSubscriptionModel = ...
// Create the storage
SubscriptionPositionStorage storage = ...
// Now combine the non-durable subscription model and the subscription position storage
SubscriptionModel durableSubscriptionModel = new DurableSubscriptionModel(nonDurableSubscriptionModel, storage);
// Start a subscription
durableSubscriptionModel.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent));
// Create the non-durable blocking subscription instance
val nonDurableSubscriptionModel : PositionAwareBlockingSubscription = ...
// Create the storage
val storage : SubscriptionPositionStorage = ...
// Now combine the non-durable subscription model and the subscription position storage
val durableSubscriptionModel : SubscriptionModel = new DurableSubscriptionModel(nonDurableSubscriptionModel, storage)
// Start a subscription
durableSubscriptionModel.subscribe("mySubscriptionId") { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) };
Catch-up Subscription (Blocking)
When starting a new subscription it’s often useful to first replay historic events to get up-to-speed and then subscribing to new events as they arrive. A catch-up subscription allows for exactly this! It combines the EventStoreQueries API with a subscription and an optional subscription storage. It starts off by streaming historic events from the event store and then automatically switch to continuous streaming mode once the historic events have caught up.
To get start you need to add the following dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>catchup-subscription</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:catchup-subscription:0.19.6'
libraryDependencies += "org.occurrent" % "catchup-subscription" % "0.19.6"
@Grab(group='org.occurrent', module='catchup-subscription', version='0.19.6')
[org.occurrent/catchup-subscription "0.19.6"]
'org.occurrent:catchup-subscription:jar:0.19.6'
<dependency org="org.occurrent" name="catchup-subscription" rev="0.19.6" />
For example:
- Java
- Kotlin
// Create the subscription position storage. Note that if PositionAwareBlockingSubscription
// is also storing the position, it's highly recommended to share the same BlockingSubscriptionPositionStorage instance.
SubscriptionPositionStorage storage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
PositionAwareSubscriptionModel continuousSubscriptionModel = ...
// Instantiate an event store that implements the EventStoreQueries API
EventStoreQueries eventStoreQueries = ...
// Now combine the continuous subscription model and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting
// from the beginning if the application where to crash during the catch-up phase.
CatchupSubscriptionModel catchupSubscriptionModelModel = new CatchupSubscriptionModel(continuousSubscriptionModel, eventStoreQueries,
new CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage)
.andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3));
// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscriptionModelModel.subscribe("mySubscription", filter(type("GameEnded")), StartAtTime.beginningOfTime(), cloudEvent -> System.out.println(cloudEvent));
// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSubscriptionModel later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscriptionModelModel.subscribe("mySubscription", filter(type("GameEnded")), cloudEvent -> System.out.println(cloudEvent));
// is also storing the position, it's highly recommended to share the same SubscriptionPositionStorage instance.
val storage : SubscriptionPositionStorage = ...
// Create the subscription instance that will be used once the replay has caught up the latest event position
val continuousSubscription : PositionAwareSubscriptionModel= ...
// Instantiate an event store that implements the EventStoreQueries API
val eventStoreQueries : EventStoreQueries = ...
// Now combine the continuous subscription model and the subscription position storage to allow
// handing over to the continuous subscription once catch-up phase is completed.
// In this example, we also store the subscription position during catch-up for every third event.
// This is optional, but useful if you're reading a lot of events and don't want to risk restarting
// from the beginning if the application where to crash during the catch-up phase. If you don't
// use "andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents" but just "useSubscriptionPositionStorage(storage)"
// then the CatchupSubscriptionModel will still start from the position in the storage, but not write to it.
// The continuous subscription (passed as first parameter to CatchupSubscriptionModel) might write to the store,
// which means that once the CatchupSubscriptionModel has caught up and the continuous subscription starts
// writing the position, the CatchupSubscriptionModel will just delegate to continuous subscription if it finds
// a position in the storage.
val catchupSubscriptionModel = CatchupSubscriptionModel(continuousSubscription, eventStoreQueries,
CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage)
.andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(3))
// Start a subscription that starts replaying events of type "GameEnded" from the beginning of time
catchupSubscriptionModel.subscribe("mySubscription", filter(type("GameEnded")), StartAt.beginningOfTime()) { cloudEvent ->
println(cloudEvent)
}
// Note that excluding "StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime())" like below would still start at
// the beginnning of time the first time, but on subsequent calls will start from the latest position storing the in the storage.
// This is recommended if you want to continue using the CatchupSubscriptionModel later when no catch-up is required
// (since the subscription has already caught up).
catchupSubscriptionModel.subscribe("mySubscription", filter(type("GameEnded"))) { cloudEvent ->
println(cloudEvent)
}
To reduce the likelihood of duplicate events when switching from replay mode to continuous mode, a CatchupSubscriptionModel
maintains an in-memory cache of event ids.
The size of this cache is configurable using a CatchupSubscriptionModelConfig
but it defaults to 100. Otherwise, there would be a chance
that event written exactly when the switch from replay mode to continuous mode takes places, can be lost. To prevent this, the continuous mode subscription
starts at a position before the last event read from the history. The purpose of the cache is thus to filter away events that are detected as duplicates during the
switch. If the cache is too small, duplicate events will be sent to the continuous subscription. Typically, you want your application to be idempotent anyways and if so this shouldn’t be a problem.
A CatchupSubscriptionModel
can be configured to store the subscription position in the supplied storage (see example above) so that, if the application crashes during replay mode, it doesn’t need to
start replaying from the beginning again. Note that if you don’t want subscription persistence during replay, you can disable this by doing new CatchupSubscriptionModelConfig(dontUseSubscriptionPositionStorage())
.
It’s also possible to change how the CatchupSubscriptionModel
sorts events read from the event store during catch-up phase. For example:
var subscriptionModel = ...
var eventStore = ..
var cfg = new CatchupSubscriptionModelConfig(100).catchupPhaseSortBy(SortBy.descending(TIME));
var catchupSubscriptionModel = CatchupSubscriptionModel(subscriptionModel, eventStore, cfg);
By default, events are sorted by time and then stream version (if two or more events have the same time).
Catch-up Subscription Usage
The subscription model will only stream historic events if started with a StartAt
instance with a so called TimeBasedSubscriptionPosition
, for example:
- Java
- Kotlin
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()), e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime())) { e ->
println("Event: $e")
}
If you don’t specify a StartAt
position (or specify StartAt.subscriptionModelDefault()
explicitly), the CatchupSubscriptionModel
will just delegate to the parent subscription model and
replay of old events will not happen. This means that for a subscription you can start it off by e.g. replaying from beginning of time then change the code and remove the StartAt
position.
It’ll then resume from the position of the last consumed event.
There are also some “shortcuts” to make it a bit more concise to start replay from beginning of time:
- Java
- Kotlin
var subscriptionModel = new CatchupSubscriptionModel(..);
// All examples below are equivalent:
subscriptionModel.subscribeFromBeginningOfTime("subscriptionId", e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAtTime.beginningOfTime(), e -> System.out.println("Event: " + e);
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()), e -> System.out.println("Event: " + e);
val subscriptionModel = new CatchupSubscriptionModel(..);
// All examples below are equivalent:
subscriptionModel.subscribeFromBeginningOfTime("subscriptionId") { e -> println("Event: $") }
subscriptionModel.subscribe("subscriptionId", StartAtTime.beginningOfTime()) { e -> println("Event: $e") }
subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()) { e -> println("Event: $") }
// beginningOfTime is an extension function imported from org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelExtensions.kt
subscriptionModel.subscribe("subscriptionId", StartAt.beginningOfTime()) { e -> println("Event: $") }
It’s also possible to start from a specific java.time.OffsetDateTime
, for example:
- Java
- Kotlin
var offsetDateTime = OffsetDateTime.of(2024, 2, 3, 10, 4, 2, 0, ZoneOffset.UTC)
subscriptionModel.subscribe("subscriptionId", StartAtTime.offsetDateTime(offsetDateTime)) { e -> println("Event: $e") }
val offsetDateTime = OffsetDateTime.of(2024, 2, 3, 10, 4, 2, 0, ZoneOffset.UTC)
subscriptionModel.subscribe("subscriptionId", StartAt.offsetDateTime(offsetDateTime)) { e -> println("Event: $e") }
Competing Consumer Subscription (Blocking)
A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription
and receive events from it, the others will be in standby. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a org.occurrent.subscription.api.blocking.CompetingConsumerStrategy
to
support different algorithms. You can write custom algorithms by implementing this interface yourself. But to use it, first depend on the CompetingConsumerSubscriptionModel
:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>competing-consumer-subscription</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:competing-consumer-subscription:0.19.6'
libraryDependencies += "org.occurrent" % "competing-consumer-subscription" % "0.19.6"
@Grab(group='org.occurrent', module='competing-consumer-subscription', version='0.19.6')
[org.occurrent/competing-consumer-subscription "0.19.6"]
'org.occurrent:competing-consumer-subscription:jar:0.19.6'
<dependency org="org.occurrent" name="competing-consumer-subscription" rev="0.19.6" />
A CompetingConsumerSubscriptionModel
takes a CompetingConsumerStrategy
as second parameter. There are currently two different implementations, both are based on MongoDB. Use the following if you’re using the native Java MongoDB driver (i.e. you’re not using Spring):
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-native-blocking-competing-consumer-strategy</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-native-blocking-competing-consumer-strategy" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-native-blocking-competing-consumer-strategy', version='0.19.6')
[org.occurrent/subscription-mongodb-native-blocking-competing-consumer-strategy "0.19.6"]
'org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-native-blocking-competing-consumer-strategy" rev="0.19.6" />
The CompetingConsumerStrategy
implementation in this module is called NativeMongoLeaseCompetingConsumerStrategy
. If you’re using Spring, depend on this module instead:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-spring-blocking-competing-consumer-strategy</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-blocking-competing-consumer-strategy:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-blocking-competing-consumer-strategy" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-blocking-competing-consumer-strategy', version='0.19.6')
[org.occurrent/subscription-mongodb-spring-blocking-competing-consumer-strategy "0.19.6"]
'org.occurrent:subscription-mongodb-spring-blocking-competing-consumer-strategy:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-spring-blocking-competing-consumer-strategy" rev="0.19.6" />
The CompetingConsumerStrategy
implementation in this module is called SpringMongoLeaseCompetingConsumerStrategy
and it’s using the MongoTemplate
from the Spring ecosystem. Both of these strategies are heavily inspired by the awesome work
of Alec Henninger. To understand how these strategies work under the hood, refer to his blog post.
Just like several other subscription models, the CompetingConsumerSubscriptionModel
wraps another subscription model and decorates it with additional functionality, in this case to add competing consumer support to it.
Below is an example that uses NativeMongoLeaseCompetingConsumerStrategy
from module org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy
with a DurableSubscriptionModel
which in turn wraps the Native MongoDB subscription model.
- Java
- Kotlin
MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database");
SubscriptionPositionStorage positionStorage = new NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage");
SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage);
// Create the CompetingConsumerSubscriptionModel
NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase);
CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy);
// Now subscribe!
competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
val mongoDatabase = mongoClient.getDatabase("some-database")
val positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage")
val wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage)
// Create the CompetingConsumerSubscriptionModel
val competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase)
val competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy)
// Now subscribe!
competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"))
If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.
Note that you can make several tweaks to the CompetingConsumerStrategy
using the Builder
, (new NativeMongoLeaseCompetingConsumerStrategy.Builder()
or new SpringMongoLeaseCompetingConsumerStrategy.Builder()
).
You can, for example, tweak how long the lease time should be for the lock (default is 20 seconds), the name of lease collection in MongoDB, as well as the retry strategy and other things.
Subscription Life-cycle & Testing (Blocking)
Subscription models may also implement the SubscriptionLifeCycle
interface (currently all blocking subscription models implements this). These subscription models supports canceling, pausing and resuming individual subscriptions. You can also stop an entire subscription model temporarily (stop
) and restart it later (start
).
Note the difference between canceling and pausing a subscription. Canceling a subscription will remove it and it’s not possible to resume it again later. Pausing a subscription will temporarily
pause the subscription, but it can later be resumed using the resumeSubscription
method.
Many of the methods in the SubscriptionLifeCycle
are good to have when you write integration tests.
It’s often useful to e.g. write events to the event store without triggering all subscriptions listening to the events. The life cycle methods allows you to selectively start/stop individual subscriptions so that you can (integration) test them in isolation.
Reactive Subscriptions
A “reactive subscription” is a subscription that uses non-blocking IO when reading events from the event store, i.e. reading changes from an EventStore will not block a thread. It uses concepts from reactive programming which is well-suited for working with streams of data. This is arguably a bit more complex for the typical Java developer, and you should consider using blocking subscriptions if high throughput, low CPU and memory-consumption is not critical.
To create a reactive subscription you first need to choose which “subscription model” to use. Then you create a subscription instance from this subscription model.
All reactive subscriptions implements the org.occurrent.subscription.api.reactor.SubscriptionModel
interface which uses
components from project reactor. This interface provide means to subscribe to new events from an EventStore
as they are written. For example:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId").doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId").doOnNext(::println).subscribe()
This will simply print each cloud event written to the event store to the console.
Note that the signature of subscribe
is defined like this:
public interface SubscriptionModel {
/**
* Stream events from the event store as they arrive. Use this method if want to start streaming from a specific position.
*
* @return A Flux with cloud events which may also includes the SubscriptionPosition that can be used to resume the stream from the current position.
*/
Flux<CloudEvent> subscribe(SubscriptionFilter filter, StartAt startAt);
// Default methods
}
It’s common that subscriptions produce “wrappers” around the vanilla io.cloudevents.CloudEvent
type that includes
the subscription position (if the datastore doesn’t maintain the subscription position on behalf of the clients). Someone, either you as the client or the datastore, needs to keep track of this position
for each individual subscriber (“mySubscriptionId” in the example above). If the datastore doesn’t provide this feature, you should use a SubscriptionModel
implementation that also implement the
org.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel
interface. The PositionAwareSubscriptionModel
is an example of a SubscriptionModel
that returns a wrapper around
io.cloudevents.CloudEvent
called org.occurrent.subscription.PositionAwareCloudEvent
which adds an additional method, SubscriptionPosition getStreamPosition()
, that you can use to get
the current subscription position. You can check if a cloud event contains a subscription position by calling PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent)
and then get the position by using PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent)
.
Note that PositionAwareCloudEvent
is fully compatible with io.cloudevents.CloudEvent
and it’s ok to treat it as such. So given that
you’re subscribing from a PositionAwareSubscriptionModel
, you are responsible for keeping track of the subscription position, so
that it’s possible to resume this subscription from the last known position on application restart. This interface also provides means to get the so called “current global subscription position”,
by calling the globalSubscriptionPosition
method which can be useful when starting a new subscription.
For example, consider the case when subscription “A” starts subscribing at the current time (T1). Event E1 is written to the EventStore
and propagated to subscription “A”. But imagine there’s a bug in “A” that prevents it
from performing its action. Later, the bug is fixed and the application is restarted at the “current time” (T2). But since T2 is after T1, E1 will not sent to “A” again since
it happened before T2. Thus this event is missed! Whether or not this is actually a problem depends on your use case. But to avoid it you should not start the subscription
at the “current time”, but rather from the “global subscription position”. This position should be written to a subscription position storage
before subscription “A” is started. Thus the subscription can continue from this position on application restart and no events will be missed.
Reactive Subscription Filters
You can also provide a subscription filter, applied at the datastore level so that it’s really efficient, if you’re only interested in certain events:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded"))).doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId", filter(type("GameEnded")).doOnNext(::println).subscribe()
This will print each cloud event written to the event store, and has type equal to “GameEnded”, to the console.
The filter
method is statically imported from org.occurrent.subscription.OccurrentSubscriptionFilter
and type
is statically imported from org.occurrent.condition.Condition
.
The OccurrentSubscriptionFilter
is generic and should be applicable to a wide variety of different datastores. However, subscription implementations
may provide different means to express filters. For example, the MongoDB subscription implementations allows you to use filters specific to MongoDB:
- Java
- Kotlin
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(System.out::println).subscribe();
subscriptionModel.subscribe("mySubscriptionId", filter().id(Filters::eq, "3c0364c3-f4a7-40d3-9fb8-a4a62d7f66e3").type(Filters::eq, "GameStarted")).doOnNext(::println).subscribe()
Now filter
is statically imported from org.occurrent.subscription.mongodb.MongoDBFilterSpecification
and Filters
is imported from
com.mongodb.client.model.Filters
(i.e the normal way to express filters in MongoDB). However, it’s recommended to always start with an OccurrentSubscriptionFilter
and only pick a more specific implementation if you cannot express your filter using the capabilities of OccurrentSubscriptionFilter
.
Reactive Subscription Start Position
A subscription can can be started at different locations in the event store. You can define where to start when a subscription is started. This is done by supplying a
org.occurrent.subscription.StartAt
instance. It provides several ways to specify the start position, either by using StartAt.now()
, StartAt.subscriptionModelDefault()
(default if StartAt
is not defined when
calling the subscribe
function), or StartAt.subscriptionPosition(<subscriptionPosition>)
, where <subscriptionPosition>
is a datastore-specific
implementation of the org.occurrent.subscription.SubscriptionPosition
interface which provides the start position as a String
. You may want to store the
String
returned by a SubscriptionPosition
in a database so that it’s possible to resume a subscription from the last processed position on application restart.
You can do this anyway you like, but for most cases you probably should consider if there’s a Subscription Position Storage
available that suits your needs. If not, you can still have a look at them for inspiration on how to write your own.
Reactive Subscription Position Storage
It’s very common that an application needs to start at its last known location in the subscription stream when it’s restarted. While you’re free to store the subscription position
provided by a reactive subscription any way you like, Occurrent provides an interface
called org.occurrent.subscription.api.reactor.SubscriptionPositionStorage
acts as a uniform abstraction for this purpose. A ReactorSubscriptionPositionStorage
is defined like this:
public interface ReactorSubscriptionPositionStorage {
Mono<SubscriptionPosition> read(String subscriptionId);
Mono<SubscriptionPosition> save(String subscriptionId, SubscriptionPosition subscriptionPosition);
Mono<Void> delete(String subscriptionId);
}
I.e. it’s a way to read/write/delete the SubscriptionPosition
for a given subscription. Occurrent ships one pre-defined reactive implementation (please contribute!):
1. ReactorSubscriptionPositionStorage
Uses the project reactor driver to store SubscriptionPosition
’s in MongoDB.
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-spring-reactor-position-storage</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor-position-storage:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor-position-storage" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor-position-storage', version='0.19.6')
[org.occurrent/subscription-mongodb-spring-reactor-position-storage "0.19.6"]
'org.occurrent:subscription-mongodb-spring-reactor-position-storage:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor-position-storage" rev="0.19.6" />
If you want to roll your own implementation (feel free to contribute to the project if you do) you can depend on the “reactive subscription API” which contains the ReactorSubscriptionPositionStorage
interface:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-api-reactor</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-api-reactor:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-api-reactor" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-api-reactor', version='0.19.6')
[org.occurrent/subscription-api-reactor "0.19.6"]
'org.occurrent:subscription-api-reactor:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-api-reactor" rev="0.19.6" />
Reactive Subscription Implementations
These are the non-durable reactive subscription implementations:
MongoDB
By “non-durable” we mean implementations that doesn’t store the subscription position in a durable storage automatically.
It might be that the datastore does this automatically or that subscription position storage is not required
for your use case. If the datastore doesn’t support storing the subscription position automatically, a subscription will typically implement the
org.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel
interface (since these types of subscriptions doesn’t need to be aware of the position).
However, you can do this anyway you like.
Typically, if you want the stream to continue where it left off on application restart you want to store away the subscription position. You can do this anyway you like,
but for most cases you probably want to look into implementations of org.occurrent.subscription.api.reactor.SubscriptionPositionStorage
.
These subscriptions can be combined with a subscription position storage implementation to store the position in a durable
datastore.
Occurrent provides a utility that combines a PositionAwareSubscriptionModel
and
a ReactorSubscriptionPositionStorage
(see here) to automatically store the subscription position
after each processed event. If you don’t want the position to be persisted after every event, it’s recommended to pick a
subscription position storage implementation, and store the position yourself when you find fit.
Reactive Subscription using Spring ReactiveMongoTemplate
An implementation that uses Spring’s ReactiveMongoTemplate for event subscriptions.
First include the following dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-mongodb-spring-reactor</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-mongodb-spring-reactor:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-mongodb-spring-reactor" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-mongodb-spring-reactor', version='0.19.6')
[org.occurrent/subscription-mongodb-spring-reactor "0.19.6"]
'org.occurrent:subscription-mongodb-spring-reactor:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-mongodb-spring-reactor" rev="0.19.6" />
Then create a new instance of ReactorMongoSubscriptionModel
and start subscribing:
- Java
- Kotlin
ReactiveMongoTemplate reactiveMongoTemplate = ...
// Create the blocking subscription
SubscriptionModel subscriptionModel = new ReactorMongoSubscriptionModel(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING);
// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
// Typically you do this after the Spring application context has finished loading. For example by subscribing to
// the (org.springframework.boot.context.event.ApplicationStartedEvent) or in a method annotated with (@PostConstruct)
subscriptionModel.subscribe("mySubscriptionId").flatMap(cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe();
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId");
val reactiveMongoTemplate : ReactiveMongoTemplate = ...
// Create the blocking subscription
val subscriptionModel = ReactorMongoSubscriptionModel(reactiveMongoTemplate, "eventCollectionName", TimeRepresentation.RFC_3339_STRING)
// Now you can create subscriptions instances that subscribes to new events as they're written to an EventStore
subscriptionModel.subscribe("mySubscriptionId").flatMap { cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent) }.subscribe()
// You can later cancel the subscription by calling:
subscriptionModel.cancelSubscription("mySubscriptionId")
The “eventCollectionName” specifies the event collection in MongoDB where events are stored. It’s important that this collection is the same as the collection
used by the EventStore
implementation. Secondly, we have the TimeRepresentation.RFC_3339_STRING
that is passed as the third constructor argument, which you can read more about
here. It’s also very important that this is configured the same way as the EventStore
.
It should also be noted that Spring takes care of re-attaching to MongoDB if there’s a connection issue or other transient errors. This can be configured when creating the ReactiveMongoTemplate
instance.
Note that you can provide a filter, start position and position persistence for this subscription implementation.
Durable Subscriptions (Reactive)
Storing the subscription position is useful if you need to resume a subscription from its last known position when restarting an application.
Occurrent provides a utility that combines a PositionAwareSubscriptionModel
and a ReactorSubscriptionPositionStorage
implementation
(see here) to automatically store the subscription position, by default,
after each processed event. If you don’t want the position to be persisted after every event, you can control how often this should happen by supplying a predicate
to ReactorDurableSubscriptionModelConfig
. There’s a pre-defined predicate, org.occurrent.subscription.util.predicate.EveryN
, that allow
the position to be stored for every n event instead of simply every event. There’s also a shortcut, e.g. new ReactorDurableSubscriptionModelConfig(3)
that
creates an instance of EveryN
that stores the position for every third event.
To use it, first to add the following dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>reactor-durable-subscription</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:reactor-durable-subscription:0.19.6'
libraryDependencies += "org.occurrent" % "reactor-durable-subscription" % "0.19.6"
@Grab(group='org.occurrent', module='reactor-durable-subscription', version='0.19.6')
[org.occurrent/reactor-durable-subscription "0.19.6"]
'org.occurrent:reactor-durable-subscription:jar:0.19.6'
<dependency org="org.occurrent" name="reactor-durable-subscription" rev="0.19.6" />
Then we should instantiate a PositionAwareSubscriptionModel
, that subscribes to the events from the event store, and an instance of a ReactorSubscriptionPositionStorage
,
that stores the subscription position, and combine them to a ReactorDurableSubscriptionModel
:
- Java
- Kotlin
// Create the non-durable blocking subscription instance
PositionAwareReactorSubscription nonDurableSubscription = ...
// Create the storage
ReactorSubscriptionPositionStorage storage = ...
// Now combine the non-durable subscription and the subscription position storage
ReactorSubscriptionWithAutomaticPositionPersistence durableSubscription = new ReactorSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage);
// Start a subscription
durableSubscription.subscribe("mySubscriptionId", cloudEvent -> doSomethingWithTheCloudEvent(cloudEvent)).subscribe();
// Create the non-durable blocking subscription instance
val nonDurableSubscription : PositionAwareReactorSubscription = ...
// Create the storage
val storage : ReactorSubscriptionPositionStorage = ...
// Now combine the non-durable subscription and the subscription position storage
val durableSubscription = new BlockingSubscriptionWithAutomaticPositionPersistence(nonDurableSubscription, storage)
// Start a subscription
durableSubscription.subscribe("mySubscriptionId") { cloudEvent ->
doSomethingWithTheCloudEvent(cloudEvent)
}.subscribe()
Decider
As of version 0.17.0, Occurrent has basic support for Deciders. A decider is a model that can be used as a structured way of implementing decision logic for a business entity (typically aggregate) or use case/command. Some benefits of using deciders are:
- You don’t need to implement any folding of events to state yourself.
- You get a good structure for defining your aggregate/use case.
- A decider can return either the new events, the new state, or both events and state (called
Decision
in Occurrent), for a specific command. - Occurrent’s decider implementation supports sending multiple commands to a decider atomically.
- Deciders are also combinable, however this feature is not yet available in Occcurrent.
To use a decider, you need to model your commands as explicit data structures instead of functions.
To create a decider, first include the dependency:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>decider</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:decider:0.19.6'
libraryDependencies += "org.occurrent" % "decider" % "0.19.6"
@Grab(group='org.occurrent', module='decider', version='0.19.6')
[org.occurrent/decider "0.19.6"]
'org.occurrent:decider:jar:0.19.6'
<dependency org="org.occurrent" name="decider" rev="0.19.6" />
You can then either implement the org.occurrent.dsl.decider.Decider
interface or use the default implementation (see more below). The interface is defined like this:
public interface Decider<C, S, E> {
S initialState();
@NotNull
List<E> decide(@NotNull C command, S state);
S evolve(S state, @NotNull E event);
default boolean isTerminal(S state) {
return false;
}
}
where:
Parameter Type | Description |
---|---|
C | The type of the commands that the decider can handle |
S | The state that the decider works with |
E | The type of events that the decider returns |
The interface contains four methods:
Method name | Description |
---|---|
initialState | Returns the initial state of the decider, for example null or something like “NotStarted ” (a domain specific state implemented by you), depending on your domain |
decide | A function that takes a command and the current state and returns a list of new events that represents the changes the occurred after the commands was handled |
evolve | A method that takes the current state and an event, and return an update state after applying this event |
isTerminal | An optional method that can be implemented/overridden to tell the Decider to stop evolving the state if the Decider has reached a specific state |
It’s highly recommended to read this blog post to get a better understanding of the rationale behind Deciders.
But you don’t actually need to implement this interface yourself, instead you can create a default implementation by passing in functions to Decider.create(..)
.
Imagine that you have commands, events and state defined like this:
- Java
- Kotlin
sealed interface Command {
record Command1(String something, String message) implements Command {
}
record Command2(String message) implements Command {
}
}
sealed interface Event {
record Event1(String something) implements Command {
}
record Event2(String somethingElse) implements Command {
}
record Event3(String message) implements Command {
}
}
record State(String something, String somethingElse, String message) {
// Other constructors excluded for brevity
}
sealed interface Command {
data class Command1(val something : String, val message : String) : Command
data class Command2(val message : String) : Command
}
sealed interface Event {
data class Event1(val something : String)
data class Event2(val somethingElse : String)
data class Event3(val message : String)
}
data class State(val something : String, val somethingElse : String, val message : String) {
// Other constructors excluded for brevity
}
Then you can create a decider like this:
- Java
- Kotlin
// This example uses Java 21+
var decider = Decider.<Command, State, Event>create(
null,
(command, state) -> switch (command) {
case Command1 c1 -> {
if (s == null) {
yield List.of(new Event1(c1.something()));
} else {
yield List.of(new Event3(c1.message()));
}
}
case Command2 c2 -> List.of(new MyEvent2(c2.somethingElse()));
},
(state, event) -> switch (event) {
case Event1 e1 -> new State(e1.something());
case Event2 e2 -> new State(s.something(), e2.message());
case Event3 e3 -> new State(s.something(), e3.somethingElse(), s.message());
}
);
// You can pass an optional Predicate as a fourth argument to Decider.create(..) if you like to specify the "isTerminal" condition, otherwise it always returns false by default.
// Importing this extension function makes creating deciders nicer from Kotlin
import org.occurrent.dsl.decider.decider
val decider = decider<Command, State?, Event>(
initialState = null,
decide = { cmd, state ->
when (cmd) {
is Command1 -> listOf(if (cmd == null) Event1(c1.something()) else Event3(c1.message()))
is Command2 -> listOf(MyEvent2(c2.somethingElse()))
}
},
evolve = { _, e ->
when (e) {
is Event1 -> State(e1.something())
is Event2 -> State(s.something(), e2.message())
is Event3 -> State(s.something(), e3.somethingElse(), s.message())
}
}
)
// You can also, optionally, define an "isTerminal" predicate as a fourth argument to the decider(..) function if you need to specify this condition, otherwise it always returns false by default.
Now that you have an instance of Decider
, you can then call any of the many default methods to return either the name state, the new events, or both. For example:
- Java
- Kotlin
// If you're using an event store to store the events, you can do like this:
List<Event> currentEvents = ...
Command command = ..
List<Event> newEvents = decider.decideOnEventsAndReturnEvents(currentEvents, command);
State newState = decider.decideOnEventsAndReturnState(currentEvents, command);
// Return both the state and the new events
Decision<State, List<Event>> decision = decider.decideOnEvents(currentEvents, command);
// Or if you store state instead of events:
State currentState = ...
Command command = ..
List<Event> newEvents = decider.decideOnStateAndReturnEvents(currentState, command);
State newState = decider.decideOnStateAndReturnState(currentState, command);
// Return both the state and the new events
Decision<State, List<Event>> decision = decider.decideOnState(currentState, command);
// You can even apply multiple commands at the same time:
List<Event> currentEvents = ...
Command command1 = ..
Command command2 = ..
// Both commands will be applied atomically
List<Event> newEvents = decider.decideOnEventsAndReturnEvents(currentEvents, command1, command2);
// Import the Kotlin extension functions
import org.occurrent.dsl.decider.decide
import org.occurrent.dsl.decider.component1
import org.occurrent.dsl.decider.component2
val currentEvents : List<Event> = ...
val currentState : State? = ...
val command : Command = ...
// We use destructuring here to get the "newState" and "newEvents" from the Decision instance returned by decide
// This is the reason for importing the "component1" and "component2" extension functions above
val (newState, newEvents) = decider.decide(events = currentEvents, command = command)
// You can also start the computation based on the current state
val (newState, newEvents) = decider.decide(state = currentState, command = command)
// And you could of course also just use the actual "Decision" if you like
val decision : Decision<State, List<Event>> = decider.decide(events = currentEvents, command = command)
// You can also supply multiple commands at the same time, then all of them will succeed or fail atomically
val (newState, newEvents) = decider.decide(currentState, command1, command2)
// You can also use the Java <code>decide</code> methods, such as <code>decideOnStateAndReturnEvent</code>, from Kotlin, but usually it's enough to just use the <code>org.occurrent.dsl.decider.decide</code> extension function.
Using an ApplicationService with Decider’s
It’s possible to integrate Decider’s with an ApplicationService to easily load existing events from an event store.
Java
To use the existing ApplicationService infrastructure with Deciders from Java, you can do like this:
ApplicationService<Event> applicationService = ...
Command command = ...
// Because the decider expects a List<Event>, and not Stream<Event> as expected by the ApplicationService,
// we first convert the Stream to a List using the "toStreamCommand" function provided by Occurrent.
var writeResult = applicationService.execute("streamId", toStreamCommand(events -> decider.decideOnEventsAndReturnEvents(events, defineName)));
toStreamCommand
can be statically imported from org.occurrent.application.composition.command.toStreamCommand
.
Kotlin
The org.occurrent:decider
module contains Kotlin extension functions, located in the org.occurrent.dsl.decider.ApplicationServiceDeciderExtensions.kt
file, that allows you to easily integrate deciders
with existing ApplicationService infrastructure. Here’s an example:
import org.occurrent.dsl.decider.execute
// Create the application service and decider
val applicationService = ...
val decider = ...
// Then you can pass the decider and command to the application service instance
val writeResult = applicationService.execute(streamId, command, decider)
It’s also possible to return the decision, state or new events when calling execute:
import org.occurrent.dsl.decider.executeAndReturnDecision
import org.occurrent.dsl.decider.executeAndReturnState
import org.occurrent.dsl.decider.executeAndReturnEvents
// Invoke the decider with the command and return both state and new events (decision)
val decision = applicationService.executeAndReturnDecision(streamId, command, decider)
// Invoke the decider with the command and return the new state
val state = applicationService.executeAndReturnState(streamId, command, decider)
// Invoke the decider with the command and return the new events
val newEvents = applicationService.executeAndReturnEvents(streamId, command, decider)
Retry
Retry Configuration (Blocking)
Occurrent contains a retry module that you can depend on using:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>retry</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:retry:0.19.6'
libraryDependencies += "org.occurrent" % "retry" % "0.19.6"
@Grab(group='org.occurrent', module='retry', version='0.19.6')
[org.occurrent/retry "0.19.6"]
'org.occurrent:retry:jar:0.19.6'
<dependency org="org.occurrent" name="retry" rev="0.19.6" />
Occurrent components that support retry (subscription model and subscription position storage implementations)
typically accepts an instance of org.occurrent.retry.RetryStrategy
to their constructors. This allows you to configure how they should do retry. You can configure max attempts,
a retry predicate, error listener, before/after retry listener, as well as the backoff strategy. Here’s an example:
RetryStrategy retryStrategy = RetryStrategy
.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(2), 2.0)
.retryIf(UncategorizedSQLException.class::isInstance)
.maxAttempts(5)
.onBeforeRetry(throwable -> log.warn("Caught exception {}, will retry.", throwable.getClass().getSimpleName()))
.onError((info, throwable) -> if(info.isLastAttempt()) log.error("Ended with exception {}.", throwable.getClass().getSimpleName()));
You can then use a RetryStrategy
instance to call methods that you want to be retried on exception by using the execute
method:
RetryStrategy retryStrategy = ..
// Retry the method if it throws an exception
retryStrategy.execute(Something::somethingThing);
RetryStrategy
is immutable, which means that you can safely do things like this:
RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5);
// Uses default 200 ms fixed delay
retryStrategy.execute(() -> Something.something());
// Use 600 ms fixed delay
retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse());
// 200 ms fixed delay again
retryStrategy.execute(() -> Thing.thing());
You can also disable retries by calling RetryStartegy.none()
.
As of version 0.11.0 you can also use the mapRetryPredicate
function easily allows you to map the current retry predicate into a new one. This is useful if you e.g. want to add a predicate to the existing predicate. For example:
// Let's say you have a retry strategy:
Retry retry = RetryStrategy.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(2), 2.0f).maxAttempts(5).retryIf(WriteConditionNotFulfilledException.class::isInstance);
// Now you also want to retry if an IllegalArgumentException is thrown:
retry.mapRetryPredicate(currentRetryPredicate -> currentRetryPredicate.or(IllegalArgument.class::isInstance))
As of version 0.16.3, RetryStrategy
now accepts a function that takes an instance of org.occurrent.retry.RetryInfo
. This is useful if you need to know the current state of your of the retry while retrying. For example:
RetryStrategy retryStrategy = RetryStrategy
.exponentialBackoff(initialDelay, maxDelay, 2.0)
.maxAttempts(10)
retryStrategy.execute(info -> {
if (info.getNumberOfAttempts() > 2 && info.getNumberOfAttempts() < 6) {
System.out.println("Number of attempts is between 3 and 5");
}
...
});
DSL’s
Subscription DSL
The subscription DSL is a utility that you can use to easier create subscriptions by using a CloudEventConverter.
There’s a both a Kotlin DSL and Java DSL. First you need to depend on the subscription-dsl
module:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>subscription-dsl-blocking</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:subscription-dsl-blocking:0.19.6'
libraryDependencies += "org.occurrent" % "subscription-dsl-blocking" % "0.19.6"
@Grab(group='org.occurrent', module='subscription-dsl-blocking', version='0.19.6')
[org.occurrent/subscription-dsl-blocking "0.19.6"]
'org.occurrent:subscription-dsl-blocking:jar:0.19.6'
<dependency org="org.occurrent" name="subscription-dsl-blocking" rev="0.19.6" />
If you’re using Kotlin you can then define subscriptions like this:
val subscriptionModel = SpringMongoSubscriptionModel(..)
val cloudEventConverter = GenericCloudEventConverter<DomainEvent>(..)
subscriptions(subscriptionModel, cloudEventConverter) {
subscribe<GameStarted>("id1") { gameStarted ->
log.info("Game was started $gameStarted")
}
subscribe<GameWon, GameLost>("id2") { domainEvent ->
log.info("Game was either won or lost: $domainEvent")
}
subscribe("everything") { domainEvent ->
log.info("I subscribe to every event: $domainEvent")
}
}
Note that as of version 0.6.0 you can also do:
subscribe<GameStarted> { gameStarted ->
log.info("Game was started $gameStarted")
}
i.e. you don’t need to specify an id explicitly. Be careful here though, since the name of the subscription will be generated from the event name (the unqualified name, in this case the subscription id would be “GameStarted”). This can lead to trouble if you rename your event because then the id of your subscription will change as well, and it won’t continue from the previous position in the subscription position storage.
If using Java you can do:
SpringMongoSubscriptionModel subscriptionModel = SpringMongoSubscriptionModel(..);
GenericCloudEventConverter cloudEventConverter = GenericCloudEventConverter<DomainEvent>(..);
Subscriptions<DomainEvent> subscriptions = new Subscriptions<DomainEvent>(subscriptionModel, cloudEventConverter);
subscriptions.subscribe("gameStarted", GameStarted.class, gameStarted -> {
log.info("Game was started {}", gameStarted)
});
For this to work, your domain events must all “implement” a DomainEvent
interface (or a sealed class in Kotlin). Note that DomainEvent
is something you create yourself,
it’s not something that is provided by Occurrent.
As of version 0.17.0 you can also get metadata (such as stream version, stream id and all other cloud event extension properties) when consuming an event:
- Java
- Kotlin
subscriptions.subscribe("gameStarted", GameStarted.class, (metadata, gameStarted) -> {
long streamVersion = metadata.getStreamVersion();
String streamId = metadata.getStreamId();
Map<String, Object> allData = metadata.getData();
var custom = allData.get("extensionPropertyDefinedInCloudEvent");
// Do stuff
});
subscribe<GameStarted> { metadata, event ->
val streamVersion = metadata.streamVersion
val streamId = metadata.streamId
val allData = metadata.data
val custom = allData["extensionPropertyDefinedInCloudEvent"]
// Do stuff
}
Query DSL
The “Query DSL” (or “domain query DSL”) is a small wrapper around the EventStoreQueries API that lets you query for domain events instead of CloudEvents.
Depend on the org.occurrent:query-dsl-blocking
module and create an instance of org.occurrent.dsl.query.blocking.DomainEventQueries
. For example:
EventStoreQueries eventStoreQueries = ..
CloudEventConverter<DomainEvent> cloudEventConverter = ..
DomainEventQueries<DomainEvent> domainEventQueries = new DomainEventQueries<DomainEvent>(eventStoreQueries, cloudEventConverter);
Stream<DomainEvent> events = domainQueries.query(Filter.subject("someSubject"));
There’s also support for skip, limits and sorting and convenience methods for querying for a single event:
Stream<DomainEvent> events = domainQueries.query(GameStarted.class, GameEnded.class); // Find only events of this type
GameStarted event1 = domainQueries.queryOne(GameStarted.class); // Find the first event of this type
GamePlayed event2 = domainQueries.queryOne(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")); // Find event with this id
There are also some Kotlin extensions that you can use to query for a Sequence
of events instead of a Stream
:
val events : Sequence<DomainEvent> = domainQueries.queryForSequence(GamePlayed::class, GameWon::class, skip = 2) // Find only events of this type and skip the first two events
val event1 = domainQueries.queryOne<GameStarted>() // Find the first event of this type
val event2 = domainQueries.queryOne<GamePlayed>(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")) // Find event with this id
Spring Boot Starter
Use the “Spring Boot Starter” project to bootstrap Occurrent quickly if using Spring. Add the following to your build script:
- Maven
- Gradle
- SBT
- Grape
- Leiningen
- Buildr
- Ivy
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>spring-boot-starter-mongodb</artifactId>
<version>0.19.6</version>
</dependency>
compile 'org.occurrent:spring-boot-starter-mongodb:0.19.6'
libraryDependencies += "org.occurrent" % "spring-boot-starter-mongodb" % "0.19.6"
@Grab(group='org.occurrent', module='spring-boot-starter-mongodb', version='0.19.6')
[org.occurrent/spring-boot-starter-mongodb "0.19.6"]
'org.occurrent:spring-boot-starter-mongodb:jar:0.19.6'
<dependency org="org.occurrent" name="spring-boot-starter-mongodb" rev="0.19.6" />
Next create a Spring Boot application annotated with @SpringBootApplication
as you would normally do, and also add the @EnableOccurrent
annotation (located in package org.occurrent.springboot.mongo.blocking
).
Occurrent will then configure the following components automatically:
- A Spring MongoDB Event Store instance (
EventStore
) - A Spring
SubscriptionPositionStorage
instance - A durable Spring MongoDB competing consumer subscription model (
SubscriptionModel
) - A Jackson-based
CloudEventConverter
. It uses a reflection based cloud event type mapper that uses the fully-qualified class name as cloud event type (you should absolutely override this bean for production use cases). You can do this, for example, by doing:@Bean public CloudEventTypeMapper<GameEvent> cloudEventTypeMapper() { return ReflectionCloudEventTypeMapper.simple(GameEvent.class); }
This will use the “simple name” (via reflection) of a domain event as the cloud event type. But since the package name is now lost from the cloud event type property, the
ReflectionCloudEventTypeMapper
will append the package name ofGameEvent
when converting back into a domain event. This only works if all your domain events are located in the exact same package asGameEvent
. If this is not the case you need to implement a more advancedCloudEventTypeMapper
such asclass CustomTypeMapper : CloudEventTypeMapper<GameEvent> { override fun getCloudEventType(type: Class<out GameEvent>): String = type.simpleName override fun <E : GameEvent> getDomainEventType(cloudEventType: String): Class<E> = when (cloudEventType) { GameStarted::class.simpleName -> GameStarted::class GamePlayed::class.simpleName -> GamePlayed::class // Add all other events here!! ... else -> throw IllegalStateException("Event type $cloudEventType is unknown") }.java as Class<E> }
or implement your own custom CloudEventConverter.
- A
GenericApplication
instance (ApplicationService
) - A subscription dsl instance (
Subscriptions
) - A query dsl instance (
DomainQueries
) - Support for annotations
You can of course override other beans as well to tailor them to your needs. See the source code of org.occurrent.springboot.mongo.blocking.OccurrentMongoAutoConfiguration if you want to know exactly what gets configured automatically.
It’s also possible to configure certain aspects from the application.yaml
file under the “occurrent” namespace.
For example:
occurrent:
application-service:
enable-default-retry-strategy: false
You can code-complete the available properties in Intellij or have a look at org.occurrent.springboot.mongo.blocking.OccurrentProperties to find which configuration properties that are supported.
Spring Boot Annotations
If using the Spring Boot Starter you have the option to start subscriptions using the @Subscription
annotation (org.occurrent.annotation.Subscription
).
For example if you have a domain event like this:
- Java
- Kotlin
public sealed interface DomainEvent permits DomainEvent1, DomainEvent2, DomainEvent3 {
UUID eventId();
ZonedDateTime timestamp();
}
public record DomainEvent1(UUID eventId, ZonedDateTime timestamp, String someData1) implements DomainEvent {
}
public record DomainEvent2(UUID eventId, ZonedDateTime timestamp, String someData2) implements DomainEvent {
}
public record DomainEvent3(UUID eventId, ZonedDateTime timestamp, String someData3) implements DomainEvent {
}
sealed interface DomainEvent {
val eventId: UUID
val timestamp: ZonedDateTime
}
data class DomainEvent1( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData1: String ) : DomainEvent
data class DomainEvent2( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData2: String ) : DomainEvent
data class DomainEvent3( override val eventId: UUID, override val timestamp: ZonedDateTime, val someData3: String ) : DomainEvent
you can create a subscription to all events like this:
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents")
void printAllDomainEvents(DomainEvent e) {
System.out.println("Received domain event %s".formatted(e));
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents")
fun printAllDomainEvents(e: DomainEvent) {
println("Received domain event $e")
}
}
Note that subscriptions started by the Subscription
annotation will make use of competing consumers so that if you run multiple instances of the application one of them will receive the event(s).
Subscription Start Position
When creating a subscription using the @Subscription
annotation you can specify how it should behave when first starting (creating) the subscription
as well as how it should be resumed when the application is restarted. Here’s an example:
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME)
void printAllDomainEvents(DomainEvent e) {
System.out.println("Received domain event %s".formatted(e));
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME)
fun printAllDomainEvents(e: DomainEvent) {
println("Received domain event $e")
}
}
This will first replay all historic events from the beginning of time and then continue subscribing to new events continuously. You can also start at a specific date by using startAtISO8601()
or startAtTimeEpochMillis()
.
Note that the example above will start replaying historic events from the beginning of time when the subscription is started the first time. However, once the subscription is resumed, e.g. on application restart, it’ll continue from the last received event.
Here’s a description for each StartPosition:
StartPosition | Description |
---|---|
BEGINNING_OF_TIME |
Start this subscription from the first event in the event store. |
NOW |
Start this subscription from current time. |
DEFAULT |
Start this subscription using the default behavior of the subscription model. Typically, this means that it’ll start from NOW , unless the subscription has already been started before, in which case the subscription will be started from its last known position. |
If you want a different behavior when the application is restarted, configure a different resumeBehavior()
(@Subscription(id="mySubscription", resumeBehavior=..)
):
Resume behavior:
ResumeBehavior | Description |
---|---|
DEFAULT |
Use the default resume behavior of the underlying subscription model. For example, if the StartPosition is set to StartPosition.BEGINNING_OF_TIME , and ResumeBehavior is set to ResumeBehavior.DEFAULT , then the subscription will start from the beginning of time the first time it’s run. Then, on application restart, it’ll continue from the last received event (the subscription position (checkpoint) for the subscription) on restart. |
SAME_AS_START_AT |
Always start at the same position as specified by the StartPosition . I.e., even if there’s a subscription position (checkpoint) stored for the subscription, it’ll be ignored on application restart and the subscription will resume from the specified StartPosition . |
The combination of start and resume behavior can enable various use cases. For example:
StartPostion | ResumeBehavior | Use Case |
---|---|---|
BEGINNING_OF_TIME |
DEFAULT |
Start a new subscription from “beginning of time”, but when the app is restarted, continue from the last processed event (i.e. don’t replay all historic events again). |
BEGINNING_OF_TIME |
SAME_AS_START_AT |
The subscription will always (even on restart) subscribe to all historic events, effectively making this an in-memory subscription. Note that this subscription will be called on each instance of the application event though competition consumer behavior is configured. |
NOW |
SAME_AS_START_AT |
The subscription will always (even on restart) subscribe to events from “now”, ignoring the historic events. This will effectively make this an in-memory subscription. Note that this subscription will be called on each instance of the application event though competition consumer behavior is configured. |
NOW |
DEFAULT |
The subscription start subscribing to events from “now” when created, but continue (resume) from the last received event on restart. |
DEFAULT |
DEFAULT |
Same as above (this is the default behavior if start position and resume behavior is not specified). |
Selective Events
If DomainEvent
is a sealed interface/class (as in the examples above), then all events implementing this interface/class will be received when subscribing to this event.
You can of course subscribe to an individual event, such as DomainEvent2
. But if you want to receive only some of the events that implement the DomainEvent
interface, you can use eventTypes()
.
For example, if you want to subscribe on both DomainEvent1
and DomainEvent3
but handle them as a DomainEvent
:
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents", eventTypes = {DomainEvent1.class, DomainEvent3.class})
void printSomeDomainEvents(DomainEvent e) {
System.out.println("Received any of the specified domain events: %s".formatted(e));
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents", eventTypes = [DomainEvent1::class, DomainEvent3::class])
fun printSomeDomainEvents(e: DomainEvent) {
println("Received any of the specified domain events: $e")
}
}
Event Metadata
Sometimes it can be useful to get the metadata associated with the received event. For this reason, you can add a parameter to the method annotated with @Subscription
of type org.occurrent.dsl.subscription.blocking.EventMetadata
.
It’ll contain all extension properties added to the CloudEvent, for example:
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents")
void printAllDomainEvents(DomainEvent e, EventMetadata metadata) {
String streamId = metadata.getStreamId();
long streamVersion = metadata.getStreamVersion();
Object myCustomValue = metadata.get("MyCustomValue");
...
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents")
fun printAllDomainEvents(e: DomainEvent, metadata: EventMetadata) {
val streamId: String = metadata.streamId
val streamVersion: Long = metadata.streamVersion
val myCustomValue: Any? = metadata["MyCustomValue"]
// ...
}
}
Subscription Startup Mode
You can configure whether the subscription should start before the application is ready to receive requests. For example, it might be very important that a certain subscription is started before the first web request comes in:
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents", startupMode = StartupMode.WAIT_UNTIL_STARTED)
void printAllDomainEvents(DomainEvent e) {
System.out.println("Received domain event %s".formatted(e));
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents", startupMode = StartupMode.WAIT_UNTIL_STARTED)
fun printAllDomainEvents(e: DomainEvent) {
println("Received domain event $e")
}
}
In other cases, such as when replaying a huge number of historic events it might be better to let the application start and let the processing of historic events happen in the background.
- Java
- Kotlin
@Component
public class Example {
@Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME, startupMode = StartupMode.BACKGROUND)
void printAllDomainEvents(DomainEvent e) {
System.out.println("Received domain event %s".formatted(e));
}
}
@Component
class Example {
@Subscription(id = "printAllDomainEvents", startAt = StartPosition.BEGINNING_OF_TIME, startupMode = StartupMode.BACKGROUND)
fun printAllDomainEvents(e: DomainEvent) {
println("Received domain event $e")
}
}
Here’s a summary of the different startup modes:
StartupMode | Description |
---|---|
DEFAULT |
Determine the startup mode based on the properties of the subscription (such as startAt() and resumeBehavior() ). It’ll use BACKGROUND if the subscription needs to replay historic events before subscribing to new ones (e.g. if startAt() is StartPosition.BEGINNING_OF_TIME ), otherwise WAIT_UNTIL_STARTED will be used. |
WAIT_UNTIL_STARTED |
The subscription will wait until it’s started up fully before Spring continues starting the rest of the application. Most of the time this is recommended because otherwise there could be a small chance that a request is received by your application before the subscription has bootstrapped completely. This can lead to the subscription missing this event. This is only true if the subscription is brand new. As soon as the subscription has received an event that is stored in a org.occurrent.subscription.api.blocking.SubscriptionPositionStorage (checkpointing), it’ll never miss an event during startup. |
BACKGROUND |
The subscription will NOT wait until it’s started up fully before Spring continues starting the rest of the application; instead, it will be started in the background. Typically, this is mainly useful if you instruct the subscription to start at an earlier date (such as the beginning of time), and you have a lot of events to read before the subscription has caught up. In this case, you may wish to start the Spring application before the subscription has fully started (i.e., before all historic events have been replayed) because waiting for all events to replay takes too long. The subscription will then replay all historic events in the background before switching to continuous mode. |
Blogs
Johan has created a couple of blog-posts on Occurrent on his blog:
Contact & Support
Would you like to contribute to Occurrent? That’s great! You should join the mailing-list or contribute on github. The mailing-list can also be used for support and discussions.
Credits
Thanks to Per Ökvist for discussions and ideas, and David Åse for letting me fork the awesome Javalin website. Credits should also go to Alec Henninger for his work on competing consumer support for MongoDB change streams.