Occurrent 0.9.0 is now available with several new features, bug fixes and improvements. Most notably is the competing consumer support. A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. Another big change is that a new interface, org.occurrent.subscription.api.blocking.Subscribable, is introduced that defines all “subscribe” methods. A SubscriptionModel now extends this interface as well as org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle. In application code that onl wants to start subscriptions, you probably want to depend on the Subscribable interface and not SubscriptionModel. There are other non-backward compatible changes as well, so make sure to read the changelog below before upgrading.

Changelog:

  • Fixed a bug in InMemorySubscription that accidentally pushed null values to subscriptions every 500 millis unless an actual event was received.
  • Renamed org.occurrent.subscription.mongodb.spring.blocking.SpringSubscriptionModelConfig to org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModelConfig.
  • Upgraded to Kotlin 1.4.31
  • All blocking subscriptions now implements the life cycle methods defined in the org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle interface. A new interface, org.occurrent.subscription.api.blocking.Subscribable has been defined, that contains all “subscribe” methods. You can use this interface in your application if all you want to do is start subscriptions.
  • Introduced a new default “StartAt” implementation called “default” (StartAt.subscriptionModelDefault()). This is different to StartAt.now() in that it will allow the subscription model to choose where to start automatically if you don’t want to start at an earlier position.
  • Removed the ability to pass a supplier returning StartAt to the subscribe methods in org.occurrent.subscription.api.blocking.Subscribable interface. Instead, use StartAt.dynamic(supplier) to achieve the same result.
  • Upgraded to CloudEvents Java SDK 2.0.0
  • Waiting for internal message listener to be shutdown when stopping SpringMongoSubscriptionModel.
  • Using a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor as executor in SpringMongoSubscriptionModel instead of the default org.springframework.core.task.SimpleAsyncTaskExecutor. The reason for this is that the DefaultMessageListenerContainer used internally in SpringMongoSubscriptionModel will wait for all threads in the ThreadPoolTaskExecutor to stop when stopping the SpringMongoSubscriptionModel instance. Otherwise, a race conditions can occur when stopping and then immediately starting a SpringMongoSubscriptionModel.
  • Introducing competing consumer support! A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a org.occurrent.subscription.api.blocking.CompetingConsumerStrategy to support different algorithms. You can write custom algorithms by implementing this interface yourself. Here’s an example of how to use the CompetingConsumerSubscriptionModel. First add the org.occurrent:competing-consumer-subscription module to classpath. This example uses the NativeMongoLeaseCompetingConsumerStrategy from module org.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy. It also wraps the DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.

    MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database");
    SubscriptionPositionStorage positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage");
    SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage);
       // Create the CompetingConsumerSubscriptionModel
    NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase);
    CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy);
       // Now subscribe!
    competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
    

    If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.