Occurrent 0.9.0 is now available with several new features, bug fixes and improvements. Most notably is the competing consumer support.
A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription
and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. Another big change is that a new interface, org.occurrent.subscription.api.blocking.Subscribable
, is introduced that
defines all “subscribe” methods. A SubscriptionModel
now extends this interface as well as org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
. In application code that onl wants to start subscriptions, you probably want to depend on the Subscribable
interface and not SubscriptionModel
. There are other non-backward compatible changes as well, so make sure to read the changelog below before upgrading.
Changelog:
- Fixed a bug in
InMemorySubscription
that accidentally pushednull
values to subscriptions every 500 millis unless an actual event was received. - Renamed
org.occurrent.subscription.mongodb.spring.blocking.SpringSubscriptionModelConfig
toorg.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModelConfig
. - Upgraded to Kotlin 1.4.31
- All blocking subscriptions now implements the life cycle methods defined in the
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
interface. A new interface,org.occurrent.subscription.api.blocking.Subscribable
has been defined, that contains all “subscribe” methods. You can use this interface in your application if all you want to do is start subscriptions. - Introduced a new default “StartAt” implementation called “default” (
StartAt.subscriptionModelDefault()
). This is different toStartAt.now()
in that it will allow the subscription model to choose where to start automatically if you don’t want to start at an earlier position. - Removed the ability to pass a supplier returning
StartAt
to the subscribe methods inorg.occurrent.subscription.api.blocking.Subscribable
interface. Instead, useStartAt.dynamic(supplier)
to achieve the same result. - Upgraded to CloudEvents Java SDK 2.0.0
- Waiting for internal message listener to be shutdown when stopping
SpringMongoSubscriptionModel
. - Using a
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
as executor inSpringMongoSubscriptionModel
instead of the defaultorg.springframework.core.task.SimpleAsyncTaskExecutor
. The reason for this is that theDefaultMessageListenerContainer
used internally inSpringMongoSubscriptionModel
will wait for all threads in theThreadPoolTaskExecutor
to stop when stopping theSpringMongoSubscriptionModel
instance. Otherwise, a race conditions can occur when stopping and then immediately starting aSpringMongoSubscriptionModel
. -
Introducing competing consumer support! A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a
org.occurrent.subscription.api.blocking.CompetingConsumerStrategy
to support different algorithms. You can write custom algorithms by implementing this interface yourself. Here’s an example of how to use theCompetingConsumerSubscriptionModel
. First add theorg.occurrent:competing-consumer-subscription
module to classpath. This example uses theNativeMongoLeaseCompetingConsumerStrategy
from moduleorg.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy
. It also wraps the DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database"); SubscriptionPositionStorage positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage"); SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage); // Create the CompetingConsumerSubscriptionModel NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase); CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy); // Now subscribe! competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.