Occurrent 0.7.0 is now available. It introduces several new methods to the (blocking) subscription model implementations. These methods are available in the new SubscriptionModelLifeCycle interface. It allows canceling, pausing and resuming individual subscriptions. You can also stop an entire subscription model temporarily (stop) and restart it later (start). This is really useful when writing integration tests. For example, if you want to write events to the event store without triggering all subscriptions listening to the events. The life cycle methods allows you to selectively start/stop individual subscriptions so that you can (integration) test them in isolation.

Another big change is the much improved RetryStrategy. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy. Retry is provided in its own module, org.occurrent:retry, but many modules already depend on this module transitively. Here’s an example:

RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0)
                                 .retryIf(throwable -> throwable instanceof OptimisticLockingException)
                                 .maxAttempts(5)
                                 .onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis()));

retryStrategy.execute(Something::somethingThing);  

Changes are:

  • Introduced many more life-cycle methods to blocking subscription models. It’s now possible to pause/resume individual subscriptions as well as starting/stopping all subscriptions. This is useful for testing purposes when you want to write events to the event store without triggering all subscriptions. The subscription models that supports this implements the new org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle interface. Supported subscription models are: InMemorySubscriptionModel, NativeMongoSubscriptionModel and SpringMongoSubscriptionModel.
  • The SpringMongoSubscriptionModel now implements org.springframework.context.SmartLifecycle, which means that if you define it as a bean, it allows controlling it as a regular Spring life-cycle bean.
  • Introduced the org.occurrent.subscription.api.blocking.DelegatingSubscriptionModel interface. Subscription models that wraps other subscription models and delegates subscriptions to them implements this interface. It contains methods for getting the wrapped subscription model. This is useful for testing purposes, if the underlying subscription model needs to stopped/started etc.
  • Fixed a bug with command composition that accidentally included the “previous events” when invoking the generated composition function.
  • Added more command composition extension functions for Kotlin. You can now compose lists of functions and not only sequences.
  • The SpringMongoSubscriptionModel now evaluates the “start at” supplier passed to the subscribe method each time a subscription is resumed.
  • Fixed a bug in InMemorySubscription where the waitUntilStarted(Duration) method always returned false.
  • InMemorySubscription now really waits for the subscription to start when calling waitUntilStarted(Duration) and waitUntilStarted.
  • Moved the cancelSubscription method from the org.occurrent.subscription.api.blocking.SubscriptionModel to the org.occurrent.subscription.api.blocking.SubscriptionModelCancelSubscription interface. This interface is also extended by org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle.
  • Introduced a much improved RetryStrategy. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy. Retry is provided in its own module, org.occurrent:retry, but many modules already depend on this module transitively. Here’s an example:

    RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0)
                                     .retryIf(throwable -> throwable instanceof OptimisticLockingException)
                                     .maxAttempts(5)
                                     .onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis()));
      
    retryStrategy.execute(Something::somethingThing);  
    

    RetryStrategy is immutable, which means that you can safely do things like this:

    RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5);
    // Uses default 200 ms fixed delay
    retryStrategy.execute(() -> Something.something());
    // Use 600 ms fixed delay
    retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse());
    // 200 ms fixed delay again
    retryStrategy.execute(() -> Thing.thing());