Table of Contents

Vert.x - Event Bus

About

The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.

Code: docs/apidocs/io/vertx/core/eventbus/EventBus.html

Example

For instance suppose that we have a verticle for dealing with HTTP requests, and a verticle for managing access to the database. The event bus allows the HTTP verticle to send a request to the database verticle that performs a SQL query, and responds back to the HTTP verticle

Features

Event-Bus:

Communication Pattern

The event bus supports the following communication patterns:

The event bus allows verticles to transparently communicate not just within the same JVM process:

Management

Get

The vertx object. gives access to the event bus. See vertx.EventBus()

Publisher/Consumer

Send a request message (publisher)

String queue = 'queueName'; // The consumer Address 
JsonObject message = new JsonObject().put("pageId", 1);
DeliveryOptions options = new DeliveryOptions().addHeader("action", "get-ip");
vertx.eventBus().request(queue , message, options, reply -> {

  if (reply.succeeded()) {
    // The data from the db
	JsonObject body = (JsonObject) reply.result().body();

    // Use the json object
	boolean success = body.getBoolean("success");
	..........

  } else {
	context.fail(reply.cause());
  }
  
});

where:

Receive a message (consumer)

String address = 'queueName';
vertx.eventBus().consumer(address, this::onMessage);

where:

where the message handler check the header and takes actions accordingly. For instance:

public void onMessage(Message<JsonObject> message) {

    if (!message.headers().contains("action")) {
      LOGGER.error("No action header specified for message with headers {} and body {}",
        message.headers(), message.body().encodePrettily());
      message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
      return;
    }
    String action = message.headers().get("action");

    switch (action) {
      case "get-ip":
        fetchIp(message);
        break;
      default:
        message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
    }
  }

Documentation