Skip to content
Michael Barker edited this page Feb 3, 2020 · 22 revisions

Aeron Cluster Tutorial

Important
This tutorial is currently a work in progress, information may be missing or incomplete.

This tutorial assumes that the the user already has a basic working knowledge of Aeron Messaging.

1. Introduction

Aeron Cluster is a framework for high-performance in memory fault tolerant services. It implements the Raft Consensus Algorithm to provide log replication, to allow multiple nodes to maintain the same state and automated leader election to ensure that there is a single leader within the cluster. Systems build using Aeron Cluster have linearizable consistency model and can cope with partial system failure, including failure of the leader node, as long as a quorum of ⌊n/2⌋ + 1 nodes remain available.

1.1. Raft Basics

It is not the intention of this tutorial to go into a full description of the Raft Consensus Algorithm, however it is difficult explain Aeron Cluster without a few definitions up front. The key concepts are (some are Aeron specific):

  • Node, a physical server, container instance, VM or group of processes that represents a logical server within a cluster.

  • Leader, a node within the cluster responsible for replicating messages to the other nodes and waiting for acknowledgements.

  • Follower, other nodes within the cluster that receive messages replicated from the leader.

  • Election, process by which the cluster agrees on a new leader.

  • Client, node external to the cluster.

  • Ingress, the messages from a client into the cluster.

  • Egress, response messages from the cluster back to a client.

  • Snapshot, serialised representation of the application logic’s state at a point in time.

1.2. Replication and Recovery

Within Aeron Cluster, replication (ensuring that a follower node has the same state as the leader) and recovery (restoring a stopped node to its previous state)[1] are the same problem or at least at similar enough that they use the same mechanism. They rely on three functions, firstly some known initial state, this could either be the state when first provisioned (a 'null' or empty state) or some snapshot of state at a point in time. Secondly an ordered log of all input messages, this is handled by Cluster’s Raft implementation. Thirdly the application logic needs to be deterministic, i.e. it much derive all of it resulting state and output events from the initial state and the input messages such that for the same inputs it will always get the same result. This can be a trickier than it first seems as it excludes some functionality that we may normally take for granted.

Tip

Be careful to ensure determinism in clustered service, common pitfalls include:

  • Timestamps

  • Random Numbers

  • Reading from configuration files.

  • Iterating over some types of collections, eg. HashMaps.

Timestamps are provide by Aeron Cluster (will be shown later). Random numbers are best avoided or use a seed fixed in the initial state or via a message. Configuration changes should be pushed into the application logic via messages. Iterating over collections should happen in a known order, with an collection the preserves ordering (e.g. TreeMap, LinkedHashMap) through sorting the values from the collection before sending them as messages.

As is may be impractical to replay all data from the "beginning of time" the system should take snapshots are periodically, e.g. daily or hourly. The frequency of the snapshot should be determined by the volume of data into the system, the throughput of the business logic, and the desired mean time to recovery. It is not uncommon to have systems that may take an hour or two to recover from a days worth of messages, in those systems snapshotting every 30 minutes may be more appropriate.

1.3. Components of Aeron Cluster

One of the key design goals of Aeron is to build a system that is highly composable. E.g. Aeron Archive which provides a means to persist an Aeron stream is used by Aeron Cluster to persist the Raft log and messages are communicated around the cluster using Aeron’s Media Driver. Therefore Aeron Cluster is an aggregation of a number of existing Aeron components and a few new ones. To successfully run a cluster node it is necessary to have one (or at least one in the case of ClusteredServiceContainer) of each of the Aeron components running. Because all communication between these components within a single node uses IPC they can be run all in the same process, in separate processes or any arbitrary combination.

1.3.1. MediaDriver

The MediaDriver is the means by which data is moved to, from, and around the cluster. Aeron Cluster reuses the Publication and Subscription functionality already provides to handle all distributed and inter-process communications.

Note
Currently only the Java implementation of the MediaDriver is supported for use in Aeron Cluster.

1.3.2. Archive

Raft is primarily a log replication protocol, so Aeron Cluster uses Aeron Archive to persist its log.

1.3.3. ConsensusModule

The Consensus Module is the key component with Aeron Cluster and ensures the nodes have a consistent copy of the replicated log. The Consensus Module will co-ordinate with the Archive to persist messages, replicate/ack messages to/from other nodes and deliver messages through to the Clustered Services.

1.3.4. ClusteredServiceContainer

This is the service that is running the developer supplied application logic. There can be one or more clustered services per node. Aeron Cluster provides a container for the application logic. There is a ClusteredService interface that must be implementation by the application that receive all of the appropriate events and messages from cluster.

2. Getting Started Aeron Cluster

Install JDK 8 or JDK 11 and ensure that $JAVA_HOME is set correctly and $JAVA_HOME/bin is on your $PATH.

The typical way to get started with Aeron in your own application is to use the aeron-all jar. However for this tutorial we are going to recommend that you check out the source code from Github.

2.1. Using the Source From this Tutorial

In order to look at the examples and run the code from this tutorial you will need to checkout the full Aeron source code.

git clone https://github.com/real-logic/aeron.git

The location on your computer where this gets checked out to will be referred to as <AERON_HOME>. You will probably want to build Aeron now to make sure that you have the environment working correctly.

cd <AERON_HOME>
./gradlew

And to build the tutorial (if you haven’t already)

./gradlew asciidoctor

This will create an HTML formatted version of the tutorial at <AERON_HOME>/aeron-samples/build/asciidoc/html5/ClusterTutorial.html

This tutorial will include snippets of code from the working example. The core scripts and classes for this tutorial are:

  • <AERON_HOME>/aeron-samples/src/main/java/io/aeron/samples/tutorial/cluster/

    • BasicAuctionClusteredService.java

    • BasicAuctionClusteredServiceNode.java

    • BasicAuctionClusterClient.java

  • <AERON_HOME>/aeron-samples/scripts/cluster/

    • basic-auction-cluster

    • basic-auction-client

You may wish to open these up in your favourite IDE or editor while we proceed with the tutorial.

3. 3. Implementing a Clustered Service

The first step to setting up a cluster is to implement the application logic. For this tutorial we are going implement a simple auction service. It will have one auction and will track a best price and an id for the customer that bid that price. To properly demonstrate the state management and recovery features of cluster it is important to have some functionality that is stateful, rather than something that is stateless like an echo service.

Note
In order to show the various features of Cluster in an understandable way we’ve kept the various aspects of the code (data serialisation, application logic, Cluster integration) very close together. It is unlikely that you would do this in a real world application. You would probably want to have greater separation of concerns and perhaps use libraries to handle functionality like serialisation.

We must define the link between the application logic and Aeron Cluster. This is done implementing the ClusteredService interface. The ClusteredService interface defines a number of callbacks that inform us of messages and lifecycle events as they occur as well as providing some of the hooks that our service can use to interact with the cluster (e.g. sending response messages and taking snapshots).

public class BasicAuctionClusteredService implements ClusteredService

3.1. Start Up

The clustered service container will notify the service that it has started using the onStart callback. This will occur before any input messages are received, either from log replay or live from a client. It is during this phase that we need to load the initial state of the service. Aeron Cluster passes in a Image that will contain the most recent valid snapshot of the service. The service should take care of deserializing the data from the image and initialize its state. We will come back to the details of how the snapshot is loaded after we’ve looked how messages are handled and how a snapshot is stored.

public void onStart(final Cluster cluster, final Image snapshotImage)
{
    this.cluster = cluster;     // (1)
    if (null != snapshotImage)  // (2)
    {
        loadSnapshot(cluster, snapshotImage);
    }
}
  1. Take a reference to the cluster, we will need this in the future.

  2. The snapshot can be null (this occurs the first time that the service is started).

3.2. Handling Messages

The onSessionMessage callback is the main entry point for requests coming into the cluster. Messages reach here by being published to Cluster’s ingress channel. This method will also be passed messages during replay from a log. It is from within this method we will interact with our application logic. Cluster also provides a reliable timestamp as a parameter to this method. As mentioned earlier this is one of the challenges of building deterministic systems. Use this value as the timestamp within your application state and it will be consistent under replay.

private final Auction auction = new Auction();
public void onSessionMessage(
    final ClientSession session,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET);     // (1)
    final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
    final long price = buffer.getLong(offset + PRICE_OFFSET);

    final boolean bidSucceeded = auction.attemptBid(price, customerId);            // (2)

    if (null != session)                                                           // (3)
    {
        egressMessageBuffer.putLong(CORRELATION_ID_OFFSET, correlationId);         // (4)
        egressMessageBuffer.putLong(CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId());
        egressMessageBuffer.putLong(PRICE_OFFSET, auction.getBestPrice());
        egressMessageBuffer.putByte(BID_SUCCEEDED_OFFSET, bidSucceeded ? (byte)1 : (byte)0);

        while (session.offer(egressMessageBuffer, 0, EGRESS_MESSAGE_LENGTH) < 0)   // (5)
        {
            cluster.idle();                                                        // (6)
        }
    }
}

For our input message we have 3 fields, a correlationId which it the customer supplied identifier for the message, customerId to identify the customer placing the bid, and a price (we’ve used a long for this, which represents the value in cents).

  1. Pull the data out of the message. This is similar to the pattern used in a Aeron Subscription’s onFragment callback when doing a poll.

  2. Execute the business logic, in our case this is applying the incoming bid to the auction to see if it is a winner.

  3. The ClientSession allows the service to get information about the calling client, but also provide a means to return responses back to the client. However it will be null during recovery, so we need to check for that state and not offer a response in that case.

  4. Serialise response message.

  5. Calling offer on the client session will send the response on the egress channel. Make sure that the return value for offer is checked as it is a non-blocking call and not guaranteed to succeed.

  6. When doing any busy loops within the clustered application use Cluster::idle(int) within the wait loop to allow the service to pause in a friendly manner. It will take care of handling thread interrupts and ensure the node fails correctly.

3.3. Storing State

As was mentioned earlier we need to regularly take snapshots of the service’s state in order to reduce the mean time to recovery and facilitate release migration. There is a callback onTakeSnapshot that will be called when it is time to snapshot the state of the service.

Aeron Cluster provides an ExclusivePublication to write a snapshot as the serialised representation of the application logic’s state. For real world applications snapshotting can become tricky. The two big concerns you will have will be ensure that snapshots are written in a consistent manner and dealing with fragmentation of the application state across messages. For now, our state is so simple that neither of those will impact us.

public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
{
    snapshotBuffer.putLong(CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId()); // (1)
    snapshotBuffer.putLong(PRICE_OFFSET, auction.getBestPrice());

    while (snapshotPublication.offer(snapshotBuffer, 0, SNAPSHOT_MESSAGE_LENGTH) < 0)  // (2)
    {
        cluster.idle();
    }
}
  1. Write the persistent part of the application logic to a message buffer. In our case, the currently winning customer id and bid price.

  2. Write the message to the publication, again we need to check the return from the offer call and use Cluster::idle inside of the busy wait loop.

3.4. Loading State

Our onStart implementation contained a method to load the snapshot. Now that we have seen how the snapshot is written, we can look at how it is loaded. The Image provided to the onStart has a method that will indicate if there is no more data available. However, application code should also encode enough information that the end of the stream can be detected from the store messages. The the two approaches can be used to do sanity checking that all of the data is received.

private void loadSnapshot(final Cluster cluster, final Image snapshotImage)
{
    final MutableBoolean allDataLoaded = new MutableBoolean(false);

    while (!snapshotImage.isEndOfStream())                                                 // (1)
    {
        final int fragmentsPolled = snapshotImage.poll((buffer, offset, length, header) -> // (2)
        {
            assert length >= SNAPSHOT_MESSAGE_LENGTH;                                      // (3)

            final long customerId = buffer.getLong(offset + SNAPSHOT_CUSTOMER_ID_OFFSET);
            final long price = buffer.getLong(offset + SNAPSHOT_PRICE_OFFSET);

            auction.loadInitialState(price, customerId);                                   // (4)

            allDataLoaded.set(true);
        }, 1);

        if (allDataLoaded.value)                                                           // (5)
        {
            break;
        }

        cluster.idle(fragmentsPolled);                                                     // (6)
    }

    assert snapshotImage.isEndOfStream();                                                  // (7)
    assert allDataLoaded.value;
}
  1. We can use the method Image::isEndOfStream to determine if there is going to be any more input.

  2. Because our snapshot is stream of messages written to a publication, we use the `Image’s poll method for extracting data from the snapshot.

  3. Our total snapshot length (16 bytes) is going to be smaller than any reasonable MTU therefore we can assume that all of the data will come in a single message.

  4. Once all of the data is loaded we can initialise the application logic state from the snapshot.

  5. Once we’ve loaded all of the data for the application we can break out of the snapshot loading loop.

  6. Again make sure we use Cluster::idle in the tight loops. It could take time for the snapshot to be propagated to the service so the number of fragments can be zero.

  7. It is also worthwhile having some sanity checks, these ensure that the snapshot store/load code and Aeron agree on where the end of the input data is. We’ve used asserts, but other mechanisms, (e.g. log message, exceptions) could also be used to indicate an issue.

3.5. Other events

There a number other events received by the ClusteredService interface, such as timer events, clients connecting and disconnecting from the cluster, and leadership and role changes. We won’t look at these in this tutorial.

4. Configuring a Cluster

Now we have our application implemented we can move onto running it in a cluster. Because there are a number of moving parts to setting up a cluster node, one of the trickiest parts of using Aeron Cluster is getting the configuration correct. We are going to start with a static three-node cluster with a simplified configuration where all of the components (MediaDriver, Archive, ConsensusModule and ClusteredServiceContainer) for a single node are hosted in a single process. We will then start the cluster as three separate processes.

4.1. Running Multiple Nodes on the Same Host

In a production deployment, you will likely want to run the 3 instances on three separate servers, however for our example we just want to run the services on a single machine. This does make port allocation a concern as each node within the cluster needs to bind to a number of ports, so we need to make sure are no clashes. We could do this with VMs or containers, but in the interest of simplicity we are going to specify a port range for each node. I.e:

  • Node 0, ports: 9000-9099

  • Node 1, ports: 9100-9199

  • Node 2, ports: 9200-9299

private static final int PORT_BASE = 9000;
private static final int PORTS_PER_NODE = 100;
private static final int ARCHIVE_CONTROL_REQUEST_PORT_OFFSET = 1;
private static final int ARCHIVE_CONTROL_RESPONSE_PORT_OFFSET = 2;
public static final int CLIENT_FACING_PORT_OFFSET = 3;
private static final int MEMBER_FACING_PORT_OFFSET = 4;
private static final int LOG_PORT_OFFSET = 5;
private static final int TRANSFER_PORT_OFFSET = 6;
private static final int LOG_CONTROL_PORT_OFFSET = 7;

static int calculatePort(final int nodeId, final int offset)
{
    return PORT_BASE + (nodeId * PORTS_PER_NODE) + offset;
}
private static String udpChannel(final int nodeId, final String hostname, final int portOffset)
{
    final int port = calculatePort(nodeId, portOffset);
    return new ChannelUriStringBuilder()
        .media("udp")
        .termLength(64 * 1024)
        .endpoint(hostname + ":" + port)
        .build();
}

While we don’t really need 100 ports for each node (closer to 10), it does make it clear which ports are assigned to each service. Each endpoint will be an offset from the first port in a node’s range. E.g. the archive control request port has an offset of 1, so for Node 2 it will have port 9201.

To start the cluster node we are going define our own class that has a main method, it will construct all of contexts for the necessary components and then start them all. As indicated above we are going to give each node of the cluster a unique id (0, 1, and 2). We are also going to use this value as the cluster member id.

public class BasicAuctionClusteredServiceNode
public static void main(final String[] args)
{
    final int nodeId = parseInt(System.getProperty("aeron.tutorial.cluster.nodeId"));    // (1)

    final List<String> hostnames = Arrays.asList("localhost", "localhost", "localhost"); // (2)
    final String hostname = hostnames.get(nodeId);

    final File baseDir = new File(System.getProperty("user.dir"), "node" + nodeId);      // (3)
    final String aeronDirName = CommonContext.getAeronDirectoryName() + "-" + nodeId + "-driver";

    final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();                   // (4)
  1. Pass in the node id as a command line parameter to the service so that we can reuse the code for each instance of the service.

  2. Define a set of hostnames for the cluster. For now everything is running on localhost, but we could put actual hostnames or ip addresses in this list an run the same example across multiple servers.

  3. For each node we need a location on disk that will hold the persistent data. This will include the raft log, mark files for each service component and the recording log used to track snapshots within the log. In a production system this location is likely to be fairly important as you will want to map this to a fast disk in order to get the best performance out the system.

  4. Create a shutdown barrier that will be used to trap exit signals and allow the service to exit cleanly.

4.2. Configuration

4.2.1. Media Driver

final MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
    .aeronDirectoryName(aeronDirName)
    .threadingMode(ThreadingMode.SHARED)
    .termBufferSparseFile(true)
    .multicastFlowControlSupplier(new MinMulticastFlowControlSupplier())
    .terminationHook(barrier::signal)
    .errorHandler(BasicAuctionClusteredServiceNode.errorHandler("Media Driver"));

Note we’ve specified the custom aeronDirName to allow multiple Media Drivers on the same host. There is nothing special in the configuration in the Media Driver specific to Cluster.

4.2.2. Archive

final Archive.Context archiveContext = new Archive.Context()
    .aeronDirectoryName(aeronDirName)
    .archiveDir(new File(baseDir, "archive"))
    .controlChannel(udpChannel(nodeId, "localhost", ARCHIVE_CONTROL_REQUEST_PORT_OFFSET))
    .localControlChannel("aeron:ipc?term-length=64k")
    .recordingEventsEnabled(false)
    .threadingMode(ArchiveThreadingMode.SHARED);

Again nothing special in the Archive configuration specific to Cluster. We’ve used the same aeronDirName as used by the Media Driver. The controlChannel uses node specific port in the construction of its UDP channel.

4.3. Archive Client

final AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
    .controlRequestChannel(archiveContext.controlChannel())
    .controlRequestStreamId(archiveContext.controlStreamId())
    .controlResponseChannel(udpChannel(nodeId, "localhost", ARCHIVE_CONTROL_RESPONSE_PORT_OFFSET))
    .aeronDirectoryName(aeronDirName);

Because Cluster’s Consensus Module requires that we write a log file to support he Raft protocol, we need a client for the constructed Archive for it to use. The client needs a channel to receive responses back from the cluster, so again we are using the node specific port to prevent collisions.

4.4. Consensus Module

final ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
    .errorHandler(errorHandler("Consensus Module"))
    .clusterMemberId(nodeId)                                                         // (1)
    .clusterMembers(clusterMembers(hostnames))                                       // (2)
    .aeronDirectoryName(aeronDirName)                                                // (3)
    .clusterDir(new File(baseDir, "consensus-module"))                               // (4)
    .ingressChannel("aeron:udp?term-length=64k")                                     // (5)
    .logChannel(logControlChannel(nodeId, hostname, LOG_CONTROL_PORT_OFFSET))        // (6)
    .archiveContext(aeronArchiveContext.clone());                                    // (7)

This is first of the two configuration sections that specific to Cluster. The Consensus Module is responsible for handling the main aspects of the Raft protocol, e.g. leader election, ensuring consensus-based consistency of the data in the log and passing properly replicated messages onto the Cluster Container.

  1. Each Consensus Module needs an identifier within the Cluster, we are going to use the nodeId that we have used to separate each of the nodes.

  2. This is probably the trickiest part of the configuration. It specifies all of the static members of the cluster along with all of the endpoints that they require for the various operations of the Consensus Module. These are encoded into a single string of the form.

    0,client-facing:port,member-facing:port,log:port,transfer:port,archive:port|\
    1,client-facing:port,member-facing:port,log:port,transfer:port,archive:port|...

    Where each of the leading numeric values is a member id for the cluster (as specified in the clusterMemberId method). The values for each endpoint represent:

    • client-facing, where the client will connect to for the ingress channel.

    • member-facing, where other members of the cluster will connect to.

    • log, used to replicate logs to.

    • transfer, used for a stream that members can use to catch up to a leader.

    • archive, the same endpoint used to control archive running on this node.

      In our example we’ve used the same hostname for each of the endpoints (localhost), however each endpoint allows the specification of a host, so that traffic could potentially be separated if required, e.g. running member-facing traffic on a separate network to the client-facing traffic.

  3. Use the node’s specific media driver.

  4. Specify the data directory for the Consensus Module. Make sure it is node specific.

  5. Define the ingress channel for the cluster. Note this value does not need to be the full channel URI. It can be a template that specifies the parameters, but excludes the endpoint, which will be filled using value from the clusterMembersString as appropriate for this node.

  6. Specify the log channel. Like the ingress channel this is specified as a template. This channel needs to be a multi-destination cast or multicast channel. In our example we are using multi-destination cast using manual configuration, hence we need a separate control endpoint (include a port) used for adding and removing destination to the publication.

  7. Clone the Archive Client context that the Consensus Module will use to talk to the Archive.

4.4.1. Clustered Service Container

The final part of the configuration is the component that will actually run our application logic. It is possible to have multiple Clustered Service Containers per Consensus Module and have them talk to each other using IPC. In our simplified case we just have the one.

final ClusteredServiceContainer.Context clusteredServiceContext =
    new ClusteredServiceContainer.Context()
    .aeronDirectoryName(aeronDirName)                         // (1)
    .archiveContext(aeronArchiveContext.clone())              // (2)
    .clusterDir(new File(baseDir, "service"))
    .clusteredService(new BasicAuctionClusteredService())     // (3)
    .errorHandler(errorHandler("Clustered Service"));
  1. Again we use the node specific Media Driver.

  2. And the node’s Archive, via the Archive Client configuration.

  3. This is the point where we bind an instance of our application logic to the cluster.

4.5. Running the Cluster

Now that the cluster is configured we can start it running. The code for launching the service is as follows.

try (
    ClusteredMediaDriver clusteredMediaDriver = ClusteredMediaDriver.launch(
        mediaDriverContext, archiveContext, consensusModuleContext);  // (1)
    ClusteredServiceContainer container = ClusteredServiceContainer.launch(
        clusteredServiceContext))                                     // (2)
{
    System.out.println("[" + nodeId + "] Started Cluster Node on " + hostname + "...");
    barrier.await();                                                  // (3)
    System.out.println("[" + nodeId + "] Exiting");
}
  1. Launches a ClusteredMediaDriver that includes instances of the Media Driver, Archive and Consensus Module.

  2. Immediately afterward we launch a ClusteredServiceContainer which is our application.

  3. Use the shutdown barrier to keep our system live until it is signalled to shutdown (e.g. using SIG_TERM or SIG_INT on Unix).

There is a script provide to launch the cluster. Assuming that you’ve followed the Using the Source From this Tutorial step above you should be able to do the following:

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster

This will spit out a whole lot of logging but should end with something like:

CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_TRANSITION -> FOLLOWER_READY, memberId=2
CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_CATCHUP -> FOLLOWER_TRANSITION, memberId=1
CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_TRANSITION -> FOLLOWER_READY, memberId=1
CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_READY -> CLOSE, memberId=1
CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_READY -> CLOSE, memberId=2
CLUSTER: ELECTION_STATE_CHANGE, LEADER_READY -> CLOSE, memberId=0

We now have our new service running in a cluster.

You can shutdown the nodes using the following:

> pkill -f BasicAuctionClusteredServiceNode

You should also be able see the to the stored data for each of the nodes in the working directory.

> ls
basic-auction-client  basic-auction-cluster  logs  node0  node1  node2  script-common

Lets look briefly how the script launches the code.

function startNode() {
    ${JAVA_HOME}/bin/java \
        -cp ../../../aeron-all/build/libs/aeron-all-${VERSION}.jar \
        -javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}-all.jar \
        -XX:BiasedLockingStartupDelay=0 \
        -XX:+UnlockExperimentalVMOptions \
        -XX:+TrustFinalNonStaticFields \
        -XX:+UnlockDiagnosticVMOptions \
        -XX:GuaranteedSafepointInterval=300000 \
        -XX:+UseParallelOldGC \
        -Daeron.event.cluster.log=all \
        -Daeron.tutorial.cluster.nodeId=$1 \
        ${JVM_OPTS} ${ADD_OPENS} \
        io.aeron.samples.tutorial.cluster.BasicAuctionClusteredServiceNode > logs/cluster-$1.log &
}

function startAll() {
  startNode 0
  startNode 1
  startNode 2

  tail -F logs/cluster-*.log
}

The -javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}-all.jar allows the weaving of the Aeron’s logging agent into the running code. Specifying -Daeron.event.cluster.log=all tells the agent to log all of the events relating to the cluster operation.

5. Using a Cluster Client

While we now have our cluster up and running, we can’t do anything with it until we have a client that will interact with the cluster.

5.1. Connecting to a Cluster

First off we need a way to connect our client to the cluster. All of the messaging between the client and the cluster happens via Aeron, but we have an additional layer called AeronCluster that handles connections to the independent nodes and provides methods for offering messages into the cluster and polling for responses.

public class BasicAuctionClusterClient implements EgressListener
final int egressPort = 19000 + customerId;

try (
    MediaDriver mediaDriver = MediaDriver.launchEmbedded(new MediaDriver.Context()  // (1)
        .threadingMode(ThreadingMode.SHARED)
        .dirDeleteOnStart(true)
        .dirDeleteOnShutdown(true));
    AeronCluster aeronCluster = AeronCluster.connect(new AeronCluster.Context()
        .egressListener(client)                                                     // (2)
        .egressChannel("aeron:udp?endpoint=localhost:" + egressPort)                // (3)
        .aeronDirectoryName(mediaDriver.aeronDirectoryName())
        .ingressChannel("aeron:udp")                                                // (4)
        .clusterMemberEndpoints(clusterMembers)))                                   // (5)
{
  1. Launch a media driver that will allow communication with the cluster nodes. We’re using an embedded instance here to make it easier to launch multiple drivers on the same machine for use in this tutorial, but it would be perfectly viable to launch a single media driver and have multiple clients on the same host sharing the same media driver. We’re also avoiding using the media drivers that each of the cluster nodes are using.

  2. This is where we bind our client application code to the responses from the cluster. This requires implementing the EgressListener interface, which provides callbacks for session messages, session related events (e.g. errors), and cluster events (e.g. new elected leaders).

  3. Specify the channel to receive egress responses back from the cluster. In our case we are using a UDP unicast response, which will limit responses to the session that sent the ingress message. We are adjusting the port per client to allow for multiple clients on the same machine with multiple media drivers. If a single media driver was used then this would not be necessary.

    It is possible to specify a multicast address here if you wanted all clients to see all responses. Some systems may find this useful, e.g. an exchange with multiple gateways may wish to have the execution for a trade sent to a client that was on the passive side of a trade.

  4. Specify the ingress channel for the cluster. In our case as we are using multiple destinations we need to use the template style approach that was used when setting up the Consensus Module. We just identify that access to the cluster is via UDP.

  5. Identify the actual static endpoints for the cluster. Will be merged into the configuration specified for the ingress channel.

Note
This example is using UDP unicast for the ingress and egress channels. It is possible to use multicast as well, but you will need to be in environment where that is well supported. In cloud deployments UDP multicast is not supported or in very early stages, so unicast is currently recommended in those environments.

5.2. Publishing to the Cluster

Next we are going push messages into the cluster itself. So that the client is able to fail over to different nodes as nodes come and go in the cluster the Aeron publications are wrapped in a class called AeronCluster. This is the second of the two items that are connected at start up. This class provides an API very similar to the Publication API.

private long sendBid(final AeronCluster aeronCluster, final long price)
{
    final long correlationId = this.correlationId++;
    actionBidBuffer.putLong(CORRELATION_ID_OFFSET, correlationId);            // (1)
    actionBidBuffer.putLong(CUSTOMER_ID_OFFSET, customerId);
    actionBidBuffer.putLong(PRICE_OFFSET, price);

    while (aeronCluster.offer(actionBidBuffer, 0, BID_MESSAGE_LENGTH) < 0)    // (2)
    {
        idleStrategy.idle(aeronCluster.pollEgress());                         // (3)
    }

    return correlationId;
}
  1. In much that same way that we would send messages to a publication we put the data that we want to send into an Agrona DirectBuffer. The AeronCluster contains offer methods that behave in the same way as the Publication offer methods.

  2. Publish the data in the same way we would using a Publication. We must check the return value to ensure that the message has been sent.

  3. If we fail to send and need to run a busy wait loop, we should utilise an IdleStrategy to ensure that we don’t inappropriately overuse the CPU. With cluster clients we should also be continually polling the egress topic to pick up any messages that have been sent back from the cluster, including errors or session status messages.

5.3. Receiving Responses

As was mentioned previously our client must have a class that implements the EgressListener interface to receive messages that come back from the cluster. The responses can application messages, session events (e.g. error messages), or notifications of a new leader. In this tutorial there is not a lot to do, we simple print out the responses that we receive.

public void onMessage(
    final long clusterSessionId,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET);
    final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
    final long currentPrice = buffer.getLong(offset + PRICE_OFFSET);
    final boolean bidSucceed = 0 != buffer.getByte(offset + BID_SUCCEEDED_OFFSET);

    lastBidSeen = currentPrice;

    printOutput(
        "SessionMessage(" + clusterSessionId + "," + correlationId + "," +
        customerId + "," + currentPrice + "," + bidSucceed + ")");
}

public void sessionEvent(
    final long correlationId,
    final long clusterSessionId,
    final long leadershipTermId,
    final int leaderMemberId,
    final EventCode code,
    final String detail)
{
    printOutput(
        "SessionEvent(" + correlationId + "," + leadershipTermId + "," +
        leaderMemberId + "," + code + "," + detail + ")");
}

public void newLeader(
    final long clusterSessionId,
    final long leadershipTermId,
    final int leaderMemberId,
    final String memberEndpoints)
{
    printOutput(
        "New Leader(" + clusterSessionId + "," + leadershipTermId + "," + leaderMemberId + ")");
}

5.4. Using the Cluster

Now that we have all of the pieces in place we start to run the cluster and client together and see how the cluster behaves. You will need at least three terminal windows as we will leave the one that we use to start the cluster open showing logging information for the cluster.

Firstly start the cluster up.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster

And wait until you see a message that indicates one of the nodes is now the leader

[57240.190127954] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=1, LEADER_READY -> CLOSE

Now in a second terminal window run a client that will place some bids into our cluster hosted auction. The 10 parameter is the customer id to use for the client, change this number if you want to run multiple clients.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-client 10

We should see some output similar to the following:

Sent (8474093274723075706,10,105) bidsRemaining = 9
SessionMessage(1,8474093274723075706,10,105,true)
Sent (8474093274723075707,10,109) bidsRemaining = 8
SessionMessage(1,8474093274723075707,10,109,true)
Sent (8474093274723075708,10,110) bidsRemaining = 7
SessionMessage(1,8474093274723075708,10,110,true)
Sent (8474093274723075709,10,119) bidsRemaining = 6
SessionMessage(1,8474093274723075709,10,119,true)
Sent (8474093274723075710,10,122) bidsRemaining = 5
SessionMessage(1,8474093274723075710,10,122,true)
Sent (8474093274723075711,10,127) bidsRemaining = 4
SessionMessage(1,8474093274723075711,10,127,true)
Sent (8474093274723075712,10,129) bidsRemaining = 3
SessionMessage(1,8474093274723075712,10,129,true)
Sent (8474093274723075713,10,135) bidsRemaining = 2
SessionMessage(1,8474093274723075713,10,135,true)
Sent (8474093274723075714,10,136) bidsRemaining = 1
SessionMessage(1,8474093274723075714,10,136,true)
Sent (8474093274723075715,10,142) bidsRemaining = 0
SessionMessage(1,8474093274723075715,10,142,true)

5.5. Failed Nodes and Leader Election

Now that the cluster is up and running and we are able to send and receive messages, we can look at some of the additional tools that are available for Cluster and experiment with some of the failover functionality.

First thing that we are interested in is which node of the three is the leader, with this information we can try to shutdown the client and see if our client is able to recover.

> java -cp <AERON_HOME>/build/libs/aeron-all-[aeronVersion].jar \
  io.aeron.cluster.ClusterTool node0/consensus-module list-members

This will show a list of all of the members in the cluster, similar to the following:

currentTimeNs=1579564309666000000, leaderMemberId=0, memberId=0, activeMembers=[ClusterMember{isBallotSent=false,
isLeader=false, hasRequestedJoin=false, id=0, leadershipTermId=5, logPosition=11872, candidateTermId=-1,
catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1, timeOfLastAppendPositionNs=1579564309666000000,
clientFacingEndpoint='localhost:9003', memberFacingEndpoint='localhost:9004', logEndpoint='localhost:9005',
transferEndpoint='localhost:9006', archiveEndpoint='localhost:9001',
endpointsDetail='localhost:9003,localhost:9004,localhost:9005,localhost:9006,localhost:9001', publication=null,
vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=1, leadershipTermId=5,
logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1,
timeOfLastAppendPositionNs=1579564309495000000, clientFacingEndpoint='localhost:9103',
memberFacingEndpoint='localhost:9104', logEndpoint='localhost:9105', transferEndpoint='localhost:9106',
archiveEndpoint='localhost:9101',
endpointsDetail='localhost:9103,localhost:9104,localhost:9105,localhost:9106,localhost:9101', publication=null,
vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=2, leadershipTermId=5,
logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1,
timeOfLastAppendPositionNs=1579564309552000000, clientFacingEndpoint='localhost:9203',
memberFacingEndpoint='localhost:9204', logEndpoint='localhost:9205', transferEndpoint='localhost:9206',
archiveEndpoint='localhost:9201',
endpointsDetail='localhost:9203,localhost:9204,localhost:9205,localhost:9206,localhost:9201', publication=null,
vote=null}], passiveMembers=[]

The part we are interested in is the leaderMemberId, this allows us to identify the leader in order to shut it down. We can discover the pid for that member using the following command:

> java -cp <AERON_HOME>/build/libs/aeron-all-[aeronVersion].jar \
  io.aeron.cluster.ClusterTool node<leaderMemberId>/consensus-module pid
15060

Note the second argument to the ClusterTool is the directory where the Consensus Module’s data is stored and we have substituted the leaderMemberId to locate the correct directory for the leader.

This will have printed out a simple number which is the pid of the leader process.

Next start the client again, but this time we will let it run a bit longer and attempt to trigger a leader election within the cluster while it is running.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-client 10 1000 3000

Once the client is running, switch to a different terminal and kill the leader process using the pid from the earlier step.

> kill <pid>

The system will stall for a short while until the followers timeout the leader an decide to elect a new one. The logs for the cluster will contain something similar to:

==> logs/cluster-0.log <==
[0] Exiting
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.base/java.lang.Thread.run(Thread.java:834)
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.base/java.lang.Thread.run(Thread.java:834)
...
[65188.006731058] CLUSTER: ELECTION_STATE_CHANGE [42/42]: memberId=1, FOLLOWER_BALLOT -> FOLLOWER_REPLAY
[65188.006791979] CLUSTER: ELECTION_STATE_CHANGE [54/54]: memberId=1, FOLLOWER_REPLAY -> FOLLOWER_CATCHUP_TRANSITION
[65188.009951316] CLUSTER: ELECTION_STATE_CHANGE [55/55]: memberId=1, FOLLOWER_CATCHUP_TRANSITION -> FOLLOWER_CATCHUP
[65188.028244817] CLUSTER: ELECTION_STATE_CHANGE [47/47]: memberId=1, FOLLOWER_CATCHUP -> FOLLOWER_TRANSITION
[65188.028355173] CLUSTER: ELECTION_STATE_CHANGE [45/45]: memberId=1, FOLLOWER_TRANSITION -> FOLLOWER_READY
[65188.02995347] CLUSTER: ELECTION_STATE_CHANGE [31/31]: memberId=1, FOLLOWER_READY -> CLOSE

==> logs/cluster-2.log <==
[65188.033401987] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=2, LEADER_READY -> CLOSE

Showing that a new leader has been elected to run the cluster. At this point the cluster will have no redundency and will only have two working nodes.

The client should of automatically resumed sending messages into the cluster and receiving responses. There may or may not be an exception logged by the client depending on timing.

Sent (-7819190107498904370,10,12969) bidsRemaining = 980
SessionMessage(20,-7819190107498904370,10,12969,true)
Sent (-7819190107498904369,10,12969) bidsRemaining = 979
Sent (-7819190107498904368,10,12970) bidsRemaining = 978
Sent (-7819190107498904367,10,12978) bidsRemaining = 977
Sent (-7819190107498904366,10,12975) bidsRemaining = 976
Sent (-7819190107498904365,10,12971) bidsRemaining = 975
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.lang.Thread.run(Thread.java:748)
New Leader(20,7,1)
Sent (-7819190107498904364,10,12977) bidsRemaining = 974
Sent (-7819190107498904363,10,12974) bidsRemaining = 973
SessionMessage(20,-7819190107498904364,10,12977,true)
SessionMessage(20,-7819190107498904363,10,12977,false)
Important
If you look carefully at the correlationId values you will notice there are no SessionMessage log entries for a number of the messages that were sent. This is because when the leader fails some of the messages being sent into the cluster can be lost. For this reason it is important the client needs to track responses to it’s messages and provide timeouts to the caller if required.

We can restore the failed node back into service:

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster <leaderMemberId>

Where the leaderMemberId is id of the member that was killed (not the new leader of the cluster). The cluster logs will show the member rejoining the cluster:

[0] Started Cluster Node on localhost...
[66250.672741109] CLUSTER: STATE_CHANGE [22/22]: memberId=0, INIT -> ACTIVE
[66250.687237677] CLUSTER: ELECTION_STATE_CHANGE [23/23]: memberId=0, INIT -> CANVASS
[66250.689468615] CLUSTER: NEW_LEADERSHIP_TERM [40/40]: logLeadershipTermId=6, leadershipTermId=6, logPosition=19424, timestamp=1579566538529, leaderMemberId=2, logSessionId=-1889384498
[66250.689493099] CLUSTER: ELECTION_STATE_CHANGE [34/34]: memberId=0, CANVASS -> FOLLOWER_REPLAY
onSessionOpen(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})
attemptBid(this=Auction{bestPrice=0, currentWinningCustomerId=-1}, price=123,customerId=132)
onSessionClose(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})
onSessionOpen(ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})

You can see that the new node looks like it is re-executing application messages. That is because it is. It is replaying the log of the messages that have been sent into the cluster up to this point in order to restore its in-memory to what it was before the failure.

5.6. Snapshots

Earlier in the tutorial we looked at how a node can store a snapshot of its current state so that it would not need to replay every single application message since the beginning of time. We can look at the recording log for a node to see the logs and the snapshots that are stored for a node in the cluster. We can also use the tools to trigger a snapshot.

Firstly lets look at the recording log.

> java -cp <AERON_HOME>/build/libs/aeron-all-[aeronVersion].jar node0/consensus-module list-members

Which should show a recording log with a single entry:

RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}], cacheIndex={0=0}}

We can trigger an snapshot on the master. This is slightly different to using the ClusterTool in that the ClusterControl needs the Aeron directory for the node’s media driver.

> java -cp ../../../aeron-all/build/libs/aeron-all-1.25.1-SNAPSHOT.jar \
  -Daeron.dir=/dev/shm/aeron-mike-<leaderMemberId>-driver/ io.aeron.cluster.ClusterControl SNAPSHOT

The recording log will now have additional entries.

RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}, Entry{recordingId=1, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=0, type=1, isValid=true, entryIndex=1}, Entry{recordingId=2, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=-1, type=1, isValid=true, entryIndex=2}], cacheIndex={0=0}}

There are two more entries. One for the snapshot just taken and a now term entry to indicate the point within the log that the service should start recovering from were it to use the new snapshot on restart.

6. Exercises

Here are some exercises that you can try out.

  1. Have multiple customers competing on the same auction.

  2. Use multicast for the egress channel so that customers can see each others prices and make better bids.

  3. Implement a second message type that will allow customers to query for the current bid price of the auction.


1. Where stopping could include crashing or manually shutting down
Clone this wiki locally