Occurrent 0.7.0 is now available. It introduces several new methods to the (blocking) subscription model implementations.
These methods are available in the new SubscriptionModelLifeCycle interface.
It allows canceling, pausing and resuming individual subscriptions.
You can also stop an entire subscription model temporarily (stop) and restart it later (start).
This is really useful when writing integration tests. For example, if you want to write events to the event store without triggering all
subscriptions listening to the events. The life cycle methods allows you to selectively start/stop individual subscriptions so that you can (integration) test them in isolation.
Another big change is the much improved RetryStrategy. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy.
Retry is provided in its own module, org.occurrent:retry, but many modules already depend on this module transitively. Here’s an example:
RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0)
.retryIf(throwable -> throwable instanceof OptimisticLockingException)
.maxAttempts(5)
.onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis()));
retryStrategy.execute(Something::somethingThing);
Changes are:
- Introduced many more life-cycle methods to blocking subscription models. It’s now possible to pause/resume individual subscriptions
as well as starting/stopping all subscriptions. This is useful for testing purposes when you want to write events
to the event store without triggering all subscriptions. The subscription models that supports this
implements the new
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycleinterface. Supported subscription models are:InMemorySubscriptionModel,NativeMongoSubscriptionModelandSpringMongoSubscriptionModel. - The
SpringMongoSubscriptionModelnow implementsorg.springframework.context.SmartLifecycle, which means that if you define it as a bean, it allows controlling it as a regular Spring life-cycle bean. - Introduced the
org.occurrent.subscription.api.blocking.DelegatingSubscriptionModelinterface. Subscription models that wraps other subscription models and delegates subscriptions to them implements this interface. It contains methods for getting the wrapped subscription model. This is useful for testing purposes, if the underlying subscription model needs to stopped/started etc. - Fixed a bug with command composition that accidentally included the “previous events” when invoking the generated composition function.
- Added more command composition extension functions for Kotlin. You can now compose lists of functions and not only sequences.
- The
SpringMongoSubscriptionModelnow evaluates the “start at” supplier passed to thesubscribemethod each time a subscription is resumed. - Fixed a bug in
InMemorySubscriptionwhere thewaitUntilStarted(Duration)method always returnedfalse. InMemorySubscriptionnow really waits for the subscription to start when callingwaitUntilStarted(Duration)andwaitUntilStarted.- Moved the
cancelSubscriptionmethod from theorg.occurrent.subscription.api.blocking.SubscriptionModelto theorg.occurrent.subscription.api.blocking.SubscriptionModelCancelSubscriptioninterface. This interface is also extended byorg.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle. -
Introduced a much improved
RetryStrategy. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy. Retry is provided in its own module,org.occurrent:retry, but many modules already depend on this module transitively. Here’s an example:RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0) .retryIf(throwable -> throwable instanceof OptimisticLockingException) .maxAttempts(5) .onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis())); retryStrategy.execute(Something::somethingThing);RetryStrategyis immutable, which means that you can safely do things like this:RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5); // Uses default 200 ms fixed delay retryStrategy.execute(() -> Something.something()); // Use 600 ms fixed delay retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse()); // 200 ms fixed delay again retryStrategy.execute(() -> Thing.thing());