Occurrent 0.9.0 is now available with several new features, bug fixes and improvements. Most notably is the competing consumer support.
A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription
and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. Another big change is that a new interface,
org.occurrent.subscription.api.blocking.Subscribable, is introduced that
defines all “subscribe” methods. A
SubscriptionModel now extends this interface as well as
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle. In application code that onl wants to start subscriptions, you probably want to depend on the
interface and not
SubscriptionModel. There are other non-backward compatible changes as well, so make sure to read the changelog below before upgrading.
- Fixed a bug in
InMemorySubscriptionthat accidentally pushed
nullvalues to subscriptions every 500 millis unless an actual event was received.
- Upgraded to Kotlin 1.4.31
- All blocking subscriptions now implements the life cycle methods defined in the
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycleinterface. A new interface,
org.occurrent.subscription.api.blocking.Subscribablehas been defined, that contains all “subscribe” methods. You can use this interface in your application if all you want to do is start subscriptions.
- Introduced a new default “StartAt” implementation called “default” (
StartAt.subscriptionModelDefault()). This is different to
StartAt.now()in that it will allow the subscription model to choose where to start automatically if you don’t want to start at an earlier position.
- Removed the ability to pass a supplier returning
StartAtto the subscribe methods in
org.occurrent.subscription.api.blocking.Subscribableinterface. Instead, use
StartAt.dynamic(supplier)to achieve the same result.
- Upgraded to CloudEvents Java SDK 2.0.0
- Waiting for internal message listener to be shutdown when stopping
- Using a
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutoras executor in
SpringMongoSubscriptionModelinstead of the default
org.springframework.core.task.SimpleAsyncTaskExecutor. The reason for this is that the
DefaultMessageListenerContainerused internally in
SpringMongoSubscriptionModelwill wait for all threads in the
ThreadPoolTaskExecutorto stop when stopping the
SpringMongoSubscriptionModelinstance. Otherwise, a race conditions can occur when stopping and then immediately starting a
Introducing competing consumer support! A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a
org.occurrent.subscription.api.blocking.CompetingConsumerStrategyto support different algorithms. You can write custom algorithms by implementing this interface yourself. Here’s an example of how to use the
CompetingConsumerSubscriptionModel. First add the
org.occurrent:competing-consumer-subscriptionmodule to classpath. This example uses the
org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy. It also wraps the DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.
MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database"); SubscriptionPositionStorage positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage"); SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage); // Create the CompetingConsumerSubscriptionModel NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase); CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy); // Now subscribe! competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.