MicroProfile Reactive Messaging
- MicroProfile Reactive Messaging Specification
- eclipse / microprofile-reactive-messaging
- MicroProfile Reactive Messaging does not contain an implementation itself but only provides the specified API, a TCK and documentation. The following implementations are available:
- Lightbend Alpakka
- SmallRye Reactive Messaging
- Open Liberty 19.0.0.9+ usage (via SmallRye Reactive Messaging)
- MicroProfile Reactive Messaging does not contain an implementation itself but only provides the specified API, a TCK and documentation. The following implementations are available:
- eclipse / microprofile-reactive-messaging
Development Model
The Reactive Messaging specification defines a development model for declaring CDI beans
producing, consuming and processing messages. The communication between these components
uses Reactive Streams. These messages can be wholly internal to the application or can be sent and received via different
message brokers.

Application’s beans contain methods annotated with @Incoming and @Outgoing annotations. A method with an @Incoming annotation consumes messages from a channel. A method with an @Outgoing annotation publishes messages to a channel. A method with both an @Incoming and an @Outgoing annotation is a message processor, it consumes messages from a channel, does some transformation to them, and publishes messages to another channel.
Channel
A channel is a name indicating which source or destination of messages is used. Channels are opaque Strings.
There are two types of channel:
- Internal channels are local to the application. They allows implementing multi-step processing where several beans from the same application form a chain of processing.
- Channels can be connected to remote brokers or various message transport layers such as Apache Kafka or to an AMQP broker. These channels are managed by connectors.
If a component receives messages from a channel, we call this channel an upstream channel. If a component produces messages to a channel, we call this channel a downstream channel. Messages flow from upstream to downstream until it reaches a final consumer.
Message
At the core of the Reactive Messaging specification is the concept of message. A message is an envelope wrapping a payload. A message is sent to a specific channel and, when received and processed successfully, acknowledged.
Messages are represented by the org.eclipse.microprofile.reactive.messaging.Message class.
This interface is intentionally kept minimal. The aim is that connectors will provide their own implementations with additional metadata that is relevant to that connector. For instance, a KafkaMessage would provide access to the topic and partition.
The org.eclipse.microprofile.reactive.messaging.Message#getPayload method retrieves the wrapped payload.
The org.eclipse.microprofile.reactive.messaging.Message#ack method acknowledges the message.
The org.eclipse.microprofile.reactive.messaging.Message#nack method reports a negative acknowledgement.
Note that the ack and nack methods are asynchronous as acknowledgement is generally an asynchronous process.
Plain messages are created using:
- org.eclipse.microprofile.reactive.messaging.Message#of(T) – wraps the given payload, no acknowledgement
- org.eclipse.microprofile.reactive.messaging.Message#of(T, java.util.function.Supplier>) – wraps the given payload and provides the acknowledgment logic
- org.eclipse.microprofile.reactive.messaging.Message#of(Supplier> ack, Function> nack) – wraps the given payload and provides the
acknowledgment and negative acknowledgment logic
–
SmallRye Reactive Messaging
Development Model
Reactive Messaging provides two main annotations:
org.eclipse.microprofile.reactive.messaging.Incoming– indicates the consumed channelorg.eclipse.microprofile.reactive.messaging.Outgoing– indicates the populated channel
These annotations are used on methods:
package beans;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("consumed-channel")
@Outgoing("populated-channel")
public Message<String> process(Message<String> in) {
// Process the payload
String payload = in.getPayload().toUpperCase();
// Create a new message from `in` and just update the payload
return in.withPayload(payload);
}
}Connectors
Reactive Messaging can handle messages generated from within the application but also interact with remote brokers. Reactive Messaging Connectors interacts with these remote brokers to retrieve messages and send messages using various protocols and technology.
Each connector handles to a specific technology. For example, a Kafka Connector is responsible for interacting with Kafka, while an MQTT Connector is responsible for MQTT interactions.
Configuring connectors
The application (src/main/resources/META-INF/[microprofile-config].properties) configures the connector with a set of properties structured as follows:
mp.messaging.[incoming|outgoing].[channel-name].[attribute]=[value]
For example:
mp.messaging.incoming.dummy-incoming-channel.connector=dummy
mp.messaging.incoming.dummy-incoming-channel.attribute=value
mp.messaging.outgoing.dummy-outgoing-channel.connector=dummy
mp.messaging.outgoing.dummy-outgoing-channel.attribute=value
You configure each channel (both incoming and outgoing) individually.
–
