Event Bus

EventBus is an essential component of the Vert.x library, facilitating communication between verticles. The Vert.x EventBus creates a distributed, lightweight messaging system that helps applications exchange messages efficiently. EventBus supports both point-to-point communication (using a request-response pattern) and publish-subscribe communication.

Point-to-Point Communication

Point-to-point messaging involves one verticle sending a message to another verticle using an address, with only one verticle receiving the message.

Example#1

A simple verticle listening for messages on an address:

public class ReceiverVerticle extends AbstractVerticle {

  private static final Logger LOGGER = LoggerFactory.getLogger(ReceiverVerticle.class);

  @Override
  public void start() throws Exception {
    vertx.eventBus().consumer("incoming.message", this::onMessage);
  }

  // onMessage() will be called when a message is received.
  private <T> void onMessage(Message<T> tMessage) {
      JsonObject message = (JsonObject) tMessage.body();
      LOGGER.info("Message Received " + message);
      tMessage.reply(message);
  }
}

To send a message to the ReceiverVerticle:

public class SenderVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    // Create a Router
    Router router = Router.router(vertx);

    // Mount the handler for incoming request
    router.get("/send/:message").handler(this::sendMessage);

    // Creating Server
    HttpServer server = vertx.createHttpServer();

    // Handle every request using the router
    server.requestHandler(router)
       // start listening on port 8081
      .listen(8081).onSuccess(msg -> {
        System.out.println("*************** Server started on " + server.actualPort() + " *************");
      });
  }
  private void sendMessage(RoutingContext context) {

    // generating random id for message
    String uuid = UUID.randomUUID().toString();

    // create event bus object
    final EventBus eventBus = vertx.eventBus();

    final String message = context.request().getParam("message");

    // creating json object for message
    JsonObject entries = new JsonObject();
    entries.put("id", uuid);
    entries.put("message", message);
    entries.put("time", System.currentTimeMillis());

    eventBus.request("incoming.message", entries, reply -> {
      if (reply.succeeded()) {
        context.json(reply.result().body());
      } else {
        System.out.println("No reply");
      }
    });
  }
}

Source Code: http://jreact.com/index.php/2023/11/24/vertx-event-bus-example-01/

Publish – Subscribe Communication

In publish/subscribe communications, there is even more decoupling between producers and consumers. When a message is sent to a destination, all subscribers receive it. Messages M1, M2, and M3 are each sent by a different producer, and all subscribers receive the messages, unlike in the case of point-to-point messaging. It is not possible to specify reply handlers for publish/subscribe communications on the event bus.

Publish/subscribe is useful when you are not sure how many verticles and handlers will be interested in a particular event. If you need message consumers to get back to the entity that sent the event, go for request-reply. Otherwise, opting for point-to-point versus publish/subscribe is a matter of functional requirements, mostly whether all consumers should process an event or just one consumer should.

Example#2

Sender.java

public class Sender extends AbstractVerticle {

  public static void main(String[] args) {
    Launcher.executeCommand("run", Sender.class.getName(), "-cluster");
  }

  @Override
  public void start() throws Exception {

    EventBus eb = vertx.eventBus();

    // Send a message every second
    vertx.setPeriodic(1000, v -> eb.publish("news-feed", "Some news!"));
  }
}

Receiver.java

public class Receiver extends AbstractVerticle {
  public static void main(String[] args) {
    Launcher.executeCommand("run", Receiver.class.getName(), "-cluster");
  }


  @Override
  public void start() throws Exception {

    EventBus eb = vertx.eventBus();

    eb.consumer("news-feed", message -> System.out.println("Received news on consumer 1: " + message.body()));

    eb.consumer("news-feed", message -> System.out.println("Received news on consumer 2: " + message.body()));

    eb.consumer("news-feed", message -> System.out.println("Received news on consumer 3: " + message.body()));

    System.out.println("Ready!");
  }
}

Source Code: http://jreact.com/index.php/2023/12/06/vertx-event-bus-pub-sub/