Occurrent 0.9.0 is now available with several new features, bug fixes and improvements. Most notably is the competing consumer support.
A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription
and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. Another big change is that a new interface, org.occurrent.subscription.api.blocking.Subscribable, is introduced that
defines all “subscribe” methods. A SubscriptionModel now extends this interface as well as org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle. In application code that onl wants to start subscriptions, you probably want to depend on the Subscribable
interface and not SubscriptionModel. There are other non-backward compatible changes as well, so make sure to read the changelog below before upgrading.
Changelog:
- Fixed a bug in
InMemorySubscriptionthat accidentally pushednullvalues to subscriptions every 500 millis unless an actual event was received. - Renamed
org.occurrent.subscription.mongodb.spring.blocking.SpringSubscriptionModelConfigtoorg.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModelConfig. - Upgraded to Kotlin 1.4.31
- All blocking subscriptions now implements the life cycle methods defined in the
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycleinterface. A new interface,org.occurrent.subscription.api.blocking.Subscribablehas been defined, that contains all “subscribe” methods. You can use this interface in your application if all you want to do is start subscriptions. - Introduced a new default “StartAt” implementation called “default” (
StartAt.subscriptionModelDefault()). This is different toStartAt.now()in that it will allow the subscription model to choose where to start automatically if you don’t want to start at an earlier position. - Removed the ability to pass a supplier returning
StartAtto the subscribe methods inorg.occurrent.subscription.api.blocking.Subscribableinterface. Instead, useStartAt.dynamic(supplier)to achieve the same result. - Upgraded to CloudEvents Java SDK 2.0.0
- Waiting for internal message listener to be shutdown when stopping
SpringMongoSubscriptionModel. - Using a
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutoras executor inSpringMongoSubscriptionModelinstead of the defaultorg.springframework.core.task.SimpleAsyncTaskExecutor. The reason for this is that theDefaultMessageListenerContainerused internally inSpringMongoSubscriptionModelwill wait for all threads in theThreadPoolTaskExecutorto stop when stopping theSpringMongoSubscriptionModelinstance. Otherwise, a race conditions can occur when stopping and then immediately starting aSpringMongoSubscriptionModel. -
Introducing competing consumer support! A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a
org.occurrent.subscription.api.blocking.CompetingConsumerStrategyto support different algorithms. You can write custom algorithms by implementing this interface yourself. Here’s an example of how to use theCompetingConsumerSubscriptionModel. First add theorg.occurrent:competing-consumer-subscriptionmodule to classpath. This example uses theNativeMongoLeaseCompetingConsumerStrategyfrom moduleorg.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy. It also wraps the DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database"); SubscriptionPositionStorage positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage"); SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage); // Create the CompetingConsumerSubscriptionModel NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase); CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy); // Now subscribe! competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.