Vert.x - Event Bus

1 - About

The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.

Code: io/vertx/core/eventbus/EventBus

3 - Example

For instance suppose that we have a verticle for dealing with HTTP requests, and a verticle for managing access to the database. The event bus allows the HTTP verticle to send a request to the database verticle that performs a SQL query, and responds back to the HTTP verticle

4 - Features

Event-Bus:

  • The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.
  • destinations are free-form strings.

5 - Communication Pattern

The event bus supports the following communication patterns:

  • point-to-point messaging, and
  • request-response messaging and
  • publish / subscribe for broadcasting messages.

The event bus allows verticles to transparently communicate not just within the same JVM process:

  • when network clustering is activated, the event bus is distributed so that messages can be sent to verticles running on other application nodes,
  • the event-bus can be accessed through a simple TCP protocol for third-party applications to communicate,
  • the event-bus can also be exposed over general-purpose messaging bridges (e.g, AMQP, Stomp),
  • a SockJS bridge allows web applications to seamlessly communicate over the event bus from JavaScript running in the browser by receiving and publishing messages just like any verticle would do.

6 - Management

6.1 - Get

The vertx object. gives access to the event bus. See vertx.EventBus()

6.2 - Publisher/Consumer

6.2.1 - Send a request message (publisher)


String queue = 'queueName'; // The consumer Address 
JsonObject message = new JsonObject().put("pageId", 1);
DeliveryOptions options = new DeliveryOptions().addHeader("action", "get-ip");
vertx.eventBus().request(queue , message, options, reply -> {

  if (reply.succeeded()) {
    // The data from the db
	JsonObject body = (JsonObject) reply.result().body();

    // Use the json object
	boolean success = body.getBoolean("success");
	..........

  } else {
	context.fail(reply.cause());
  }
  
});

where:

6.2.2 - Receive a message (consumer)


String address = 'queueName';
vertx.eventBus().consumer(address, this::onMessage);

where:

  • address is where this consumer can be called
  • this::onMessage defines the function to be called when a message is received

where the message handler check the header and takes actions accordingly. For instance:


public void onMessage(Message<JsonObject> message) {

    if (!message.headers().contains("action")) {
      LOGGER.error("No action header specified for message with headers {} and body {}",
        message.headers(), message.body().encodePrettily());
      message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
      return;
    }
    String action = message.headers().get("action");

    switch (action) {
      case "get-ip":
        fetchIp(message);
        break;
      default:
        message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
    }
  }

7 - Documentation


Data Science
Data Analysis
Statistics
Data Science
Linear Algebra Mathematics
Trigonometry

Powered by ComboStrap