Vert.x - Event Bus

About

The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.

Code: io/vertx/core/eventbus/EventBus

Example

For instance suppose that we have a verticle for dealing with HTTP requests, and a verticle for managing access to the database. The event bus allows the HTTP verticle to send a request to the database verticle that performs a SQL query, and responds back to the HTTP verticle

Features

Event-Bus:

  • The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.
  • destinations are free-form strings.

Communication Pattern

The event bus supports the following communication patterns:

  • point-to-point messaging, and
  • request-response messaging and
  • publish / subscribe for broadcasting messages.

The event bus allows verticles to transparently communicate not just within the same JVM process:

  • when network clustering is activated, the event bus is distributed so that messages can be sent to verticles running on other application nodes,
  • the event-bus can be accessed through a simple TCP protocol for third-party applications to communicate,
  • the event-bus can also be exposed over general-purpose messaging bridges (e.g, AMQP, Stomp),
  • a SockJS bridge allows web applications to seamlessly communicate over the event bus from JavaScript running in the browser by receiving and publishing messages just like any verticle would do.

Management

Get

The vertx object. gives access to the event bus. See vertx.EventBus()

Publisher/Consumer

Send a request message (publisher)

String queue = 'queueName'; // The consumer Address 
JsonObject message = new JsonObject().put("pageId", 1);
DeliveryOptions options = new DeliveryOptions().addHeader("action", "get-ip");
vertx.eventBus().request(queue , message, options, reply -> {

  if (reply.succeeded()) {
    // The data from the db
	JsonObject body = (JsonObject) reply.result().body();

    // Use the json object
	boolean success = body.getBoolean("success");
	..........

  } else {
	context.fail(reply.cause());
  }
  
});

where:

Receive a message (consumer)

String address = 'queueName';
vertx.eventBus().consumer(address, this::onMessage);

where:

  • address is where this consumer can be called
  • this::onMessage defines the function to be called when a message is received

where the message handler check the header and takes actions accordingly. For instance:

public void onMessage(Message<JsonObject> message) {

    if (!message.headers().contains("action")) {
      LOGGER.error("No action header specified for message with headers {} and body {}",
        message.headers(), message.body().encodePrettily());
      message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
      return;
    }
    String action = message.headers().get("action");

    switch (action) {
      case "get-ip":
        fetchIp(message);
        break;
      default:
        message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
    }
  }

Documentation


Powered by ComboStrap