SmallRye Mutiny
https://smallrye.io/smallrye-mutiny/latest/
Mutiny provides only two types (Multi and Uni), which can handle any kind of asynchronous interactions.
Mutiny is a novel reactive programming library. It provides a simple but powerful asynchronous development model that lets you build reactive applications. Mutiny can be used in any Java application exhibiting asynchrony. From reactive microservices, data streaming, event processing to API gateways and network utilities, Mutiny is a great fit.
Examples SmallRye Mutiny
https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples
Mutiny
Getting started with Mutiny
Java
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
<version>2.5.3</version>
</dependency>Quarkus
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>Vert.x
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
<version>3.7.2</version>
</dependency>Hello Mutiny!
import io.smallrye.mutiny.Uni;
public class FirstProgram {
public static void main(String[] args) {
Uni.createFrom().item("hello")
.onItem().transform(item -> item + " mutiny")
.onItem().transform(String::toUpperCase)
.subscribe().with(item -> System.out.println(">> " + item));
}
}Uni is one of the two types with Multi that Mutiny provides:
Uni– is a stream emitting either a single item or a failure.Multi– represents streams of 0..* items (potentially unbounded)
Uni
A Uni represents a stream that can only emit either an item or a failure event.
The Uni type
A Uni<T> is a specialized stream that emits only an item or a failure. Typically, Uni<T> are great to represent asynchronous actions such as a remote procedure call, an HTTP request, or an operation producing a single result.
Uni<T> provides many operators that create, transform, and orchestrate Uni sequences.
As said, Uni<T> emits either an item or a failure. Note that the item can be null, and the Uni API has specific methods for this case.
Typically, a Uni<Void> always emits null as item event or a failure if the represented operation fails. You can consider the item event as a completion signal indicating the success of the operation.
The offered operators can be used to define a processing pipeline. The event, either the item or failure, flows in this pipeline, and each operator can process or transform the event. Unis are lazy by nature.
To trigger the computation, you must have a final subscriber indicating your interest. The following snippet provides a simple example of pipeline using Uni:
Uni.createFrom().item(1)
.onItem().transform(i -> "hello-" + i)
.onItem().delayIt().by(Duration.ofMillis(100))
.subscribe().with(System.out::println);
Subscribing to a Uni
Important: Remember: if you don’t subscribe, nothing is going to happen. What’s more, the pipeline is materialized for each subscription.
When subscribing to a Uni, you can pass an item callback (invoked when the item is emitted), or two callbacks (one receiving the item and one receiving the failure):
Cancellable cancellable = uni
.subscribe().with(
item -> System.out.println(item),
failure -> System.out.println("Failed with " + failure));Note the returned Cancellable: this object allows canceling the operation if need be.
Creating Unis from items
There are many ways to create Uni instances. Use Uni.createFrom() to see all the possibilities.
You can, for instance, create a Uni from a known value:
Uni<Integer> uni = Uni.createFrom().item(1);Every subscriber receives the item 1 just after the subscription.
You can also pass a Supplier:
AtomicInteger counter = new AtomicInteger();
Uni<Integer> uni = Uni.createFrom().item(() -> counter.getAndIncrement());The Supplier is called for every subscriber. So, each of them will get a different value.
Creating failing Unis
Operations represented by Unis can also emit a failure event, indicating that the operation failed.
You can create failed Uni instances with:
// Pass an exception directly:
Uni<Integer> failed1 = Uni.createFrom().failure(new Exception("boom"));
// Pass a supplier called for every subscriber:
Uni<Integer> failed2 = Uni.createFrom().failure(() -> new Exception("boom"));Creating Uni<Void>
When the represented operation to not produce a result, you still need a way to indicate the operation’s completion. For this, you need to emit a null item:
Uni<Void> uni = Uni.createFrom().nullItem();Creating Unis using an emitter (advanced)
You can create a Uni using an emitter. This approach is useful when integrating callback-based APIs:
Uni<String> uni = Uni.createFrom().emitter(em -> {
// When the result is available, emit it
em.complete(result);
});The emitter can also send a failure. It can also get notified of cancellation to, for example, stop the work in progress.
Creating Unis from a CompletionStage (advanced)
You can also Uni objects from CompletionStage / CompletableFuture. This is useful when integrating with APIs that are based on these types:
Uni<String> uni = Uni.createFrom().completionStage(stage);Tip: You can also create a CompletionStage from a Uni using uni.subscribe().asCompletionStage()
Multi
A Multi represents a stream of data. A stream can emit 0, 1, n, or an infinite number of items.
The Multi type
A Multi<T> is a data stream that:
- emits
0..nitem events - emits a failure event
- emits a completion event for bounded streams
Warning: Failures are terminal events: after having received a failure no further item will be emitted.
Multi<T> provides many operators that create, transform, and orchestrate Multi sequences. The operators can be used to define a processing pipeline. The events flow in this pipeline, and each operator can process or transform the events.
Multis are lazy by nature. To trigger the computation, you must subscribe.
The following snippet provides a simple example of pipeline using Multi:
Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().transform(i -> i * 2)
.select().first(3)
.onFailure().recoverWithItem(0)
.subscribe().with(System.out::println);
Note the returned Cancellable: this object allows canceling the stream if need be.
Creating Multi from items
There are many ways to create Multi instances. See Multi.createFrom() to see all the possibilities.
For instance, you can create a Multi from known items or from an Iterable:
Multi<Integer> multiFromItems = Multi.createFrom().items(1, 2, 3, 4);
Multi<Integer> multiFromIterable = Multi.createFrom().iterable(Arrays.asList(1, 2, 3, 4, 5));
Every subscriber receives the same set of items (1, 2… 5) just after the subscription.
You can also use Suppliers:
AtomicInteger counter = new AtomicInteger();
Multi<Integer> multi = Multi.createFrom().items(() ->
IntStream.range(counter.getAndIncrement(), counter.get() * 2).boxed());
The Supplier is called for every subscriber, so each of them will get different values.
Tip: You can create ranges using Multi.createFrom().range(start, end).
Creating failing Multis
Streams can also fail.
Failures are used to indicate to the downstream subscribers that the source encountered a terrible error and cannot continue emitting items. Create failed Multi instances with:
// Pass an exception directly:
Multi<Integer> failed1 = Multi.createFrom().failure(new Exception("boom"));
// Pass a supplier called for every subscriber:
Multi<Integer> failed2 = Multi.createFrom().failure(() -> new Exception("boom"));
Creating empty Multis
Unlike Uni, Multi streams don’t send null items (this is forbidden in reactive streams).
Instead Multi streams send completion events indicating that there are no more items to consume. Of course, the completion event can happen even if there are no items, creating an empty stream.
You can create such a stream using:
Multi<String> multi = Multi.createFrom().empty();Creating Multis using an emitter (advanced)
You can create a Multi using an emitter. This approach is useful when integrating callback-based APIs:
Multi<Integer> multi = Multi.createFrom().emitter(em -> {
em.emit(1);
em.emit(2);
em.emit(3);
em.complete();
});The emitter can also send a failure. It can also get notified of cancellation to, for example, stop the work in progress.
Creating Multis from ticks (advanced)
You can create a stream that emit a ticks periodically:
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(100));
The downstream receives a long, which is a counter. For the first tick, it’s 0, then 1, then 2, and so on.
Creating Multis from a generator (advanced)
You can create a stream from some initial state, and a generator function:
Multi<Object> sequence = Multi.createFrom().generator(() -> 1, (n, emitter) -> {
int next = n + (n / 2) + 1;
if (n < 50) {
emitter.emit(next);
} else {
emitter.complete();
}
return next;
});
The initial state is given through a supplier (here () -> 1). The generator function accepts 2 arguments:
- the current state,
- an emitter that can emit a new item, emit a failure, or emit a completion.
The generator function return value is the next current state. Running the previous example gives the following number suite: {2, 4, 7, 11, 17, 26, 40, 61}.
