Occurrent 0.7.0 is now available. It introduces several new methods to the (blocking) subscription model implementations.
These methods are available in the new SubscriptionModelLifeCycle interface.
It allows canceling, pausing and resuming individual subscriptions.
You can also stop an entire subscription model temporarily (stop
) and restart it later (start
).
This is really useful when writing integration tests. For example, if you want to write events to the event store without triggering all
subscriptions listening to the events. The life cycle methods allows you to selectively start/stop individual subscriptions so that you can (integration) test them in isolation.
Another big change is the much improved RetryStrategy. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy.
Retry is provided in its own module, org.occurrent:retry
, but many modules already depend on this module transitively. Here’s an example:
RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0)
.retryIf(throwable -> throwable instanceof OptimisticLockingException)
.maxAttempts(5)
.onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis()));
retryStrategy.execute(Something::somethingThing);
Changes are:
- Introduced many more life-cycle methods to blocking subscription models. It’s now possible to pause/resume individual subscriptions
as well as starting/stopping all subscriptions. This is useful for testing purposes when you want to write events
to the event store without triggering all subscriptions. The subscription models that supports this
implements the new
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
interface. Supported subscription models are:InMemorySubscriptionModel
,NativeMongoSubscriptionModel
andSpringMongoSubscriptionModel
. - The
SpringMongoSubscriptionModel
now implementsorg.springframework.context.SmartLifecycle
, which means that if you define it as a bean, it allows controlling it as a regular Spring life-cycle bean. - Introduced the
org.occurrent.subscription.api.blocking.DelegatingSubscriptionModel
interface. Subscription models that wraps other subscription models and delegates subscriptions to them implements this interface. It contains methods for getting the wrapped subscription model. This is useful for testing purposes, if the underlying subscription model needs to stopped/started etc. - Fixed a bug with command composition that accidentally included the “previous events” when invoking the generated composition function.
- Added more command composition extension functions for Kotlin. You can now compose lists of functions and not only sequences.
- The
SpringMongoSubscriptionModel
now evaluates the “start at” supplier passed to thesubscribe
method each time a subscription is resumed. - Fixed a bug in
InMemorySubscription
where thewaitUntilStarted(Duration)
method always returnedfalse
. InMemorySubscription
now really waits for the subscription to start when callingwaitUntilStarted(Duration)
andwaitUntilStarted
.- Moved the
cancelSubscription
method from theorg.occurrent.subscription.api.blocking.SubscriptionModel
to theorg.occurrent.subscription.api.blocking.SubscriptionModelCancelSubscription
interface. This interface is also extended byorg.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
. -
Introduced a much improved
RetryStrategy
. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy. Retry is provided in its own module,org.occurrent:retry
, but many modules already depend on this module transitively. Here’s an example:RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0) .retryIf(throwable -> throwable instanceof OptimisticLockingException) .maxAttempts(5) .onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis())); retryStrategy.execute(Something::somethingThing);
RetryStrategy
is immutable, which means that you can safely do things like this:RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5); // Uses default 200 ms fixed delay retryStrategy.execute(() -> Something.something()); // Use 600 ms fixed delay retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse()); // 200 ms fixed delay again retryStrategy.execute(() -> Thing.thing());