-
Notifications
You must be signed in to change notification settings - Fork 2
Cluster Tutorial
Important
|
This tutorial is currently a work in progress, information may be missing or incomplete. |
This tutorial assumes that the the user already has a basic working knowledge of Aeron Messaging.
Aeron Cluster is a framework for high-performance in memory fault tolerant services. It implements the
Raft Consensus Algorithm to provide log replication, to allow multiple nodes to maintain the
same state and automated leader election to ensure that there is a single leader within the cluster. Systems build
using Aeron Cluster have linearizable consistency model and can cope with partial system failure, including failure of
the leader node, as long as a quorum of ⌊n/2⌋ + 1
nodes remain available.
It is not the intention of this tutorial to go into a full description of the Raft Consensus Algorithm, however it is difficult explain Aeron Cluster without a few definitions up front. The key concepts are (some are Aeron specific):
-
Node, a physical server, container instance, VM or group of processes that represents a logical server within a cluster.
-
Leader, a node within the cluster responsible for replicating messages to the other nodes and waiting for acknowledgements.
-
Follower, other nodes within the cluster that receive messages replicated from the leader.
-
Election, process by which the cluster agrees on a new leader.
-
Client, node external to the cluster.
-
Ingress, the messages from a client into the cluster.
-
Egress, response messages from the cluster back to a client.
-
Snapshot, serialised representation of the application logic’s state at a point in time.
Within Aeron Cluster, replication (ensuring that a follower node has the same state as the leader) and recovery (restoring a stopped node to its previous state)[1] are the same problem or at least at similar enough that they use the same mechanism. They rely on three functions, firstly some known initial state, this could either be the state when first provisioned (a 'null' or empty state) or some snapshot of state at a point in time. Secondly an ordered log of all input messages, this is handled by Cluster’s Raft implementation. Thirdly the application logic needs to be deterministic, i.e. it much derive all of it resulting state and output events from the initial state and the input messages such that for the same inputs it will always get the same result. This can be a trickier than it first seems as it excludes some functionality that we may normally take for granted.
Tip
|
Be careful to ensure determinism in clustered service, common pitfalls include:
|
Timestamps are provide by Aeron Cluster (will be shown later). Random numbers are best avoided or use a seed fixed in the initial state or via a message. Configuration changes should be pushed into the application logic via messages. Iterating over collections should happen in a known order, with an collection the preserves ordering (e.g. TreeMap, LinkedHashMap) through sorting the values from the collection before sending them as messages.
As is may be impractical to replay all data from the "beginning of time" the system should take snapshots are periodically, e.g. daily or hourly. The frequency of the snapshot should be determined by the volume of data into the system, the throughput of the business logic, and the desired mean time to recovery. It is not uncommon to have systems that may take an hour or two to recover from a days worth of messages, in those systems snapshotting every 30 minutes may be more appropriate.
One of the key design goals of Aeron is to build a system that is highly composable. E.g. Aeron Archive which provides a means to persist an Aeron stream is used by Aeron Cluster to persist the Raft log and messages are communicated around the cluster using Aeron’s Media Driver. Therefore Aeron Cluster is an aggregation of a number of existing Aeron components and a few new ones. To successfully run a cluster node it is necessary to have one (or at least one in the case of ClusteredServiceContainer) of each of the Aeron components running. Because all communication between these components within a single node uses IPC they can be run all in the same process, in separate processes or any arbitrary combination.
The MediaDriver is the means by which data is moved to, from, and around the cluster. Aeron Cluster reuses the Publication and Subscription functionality already provides to handle all distributed and inter-process communications.
Note
|
Currently only the Java implementation of the MediaDriver is supported for use in Aeron Cluster. |
Raft is primarily a log replication protocol, so Aeron Cluster uses Aeron Archive to persist its log.
The Consensus Module is the key component with Aeron Cluster and ensures the nodes have a consistent copy of the replicated log. The Consensus Module will co-ordinate with the Archive to persist messages, replicate/ack messages to/from other nodes and deliver messages through to the Clustered Services.
This is the service that is running the developer supplied application logic. There can be one or more clustered services per node. Aeron Cluster provides a container for the application logic. There is a ClusteredService interface that must be implementation by the application that receive all of the appropriate events and messages from cluster.
Install JDK 8 or JDK 11 and ensure that $JAVA_HOME
is set correctly and $JAVA_HOME/bin
is on your $PATH
.
The typical way to get started with Aeron in your own application is to use the aeron-all jar. However for this tutorial we are going to recommend that you check out the source code from Github.
In order to look at the examples and run the code from this tutorial you will need to checkout the full Aeron source code, use the appropriate release version and build the projects.
> git clone https://github.com/real-logic/aeron.git > cd aeron > git checkout -b my_tutorial 1.25.2-SNAPSHOT > ./gradlew
The location on your computer where this has been checked out to will be referred to as <AERON_HOME>
.
This tutorial will include snippets of code from the working example. The scripts and classes for this tutorial are:
-
<AERON_HOME>/aeron-samples/src/main/java/io/aeron/samples/tutorial/cluster/
-
BasicAuctionClusteredService.java
-
BasicAuctionClusteredServiceNode.java
-
BasicAuctionClusterClient.java
-
-
<AERON_HOME>/aeron-samples/scripts/cluster/
-
basic-auction-cluster
-
basic-auction-client
-
You may wish to open these up in your favourite IDE or editor while we proceed with the tutorial.
The first step to setting up a cluster is to implement the application logic. For this tutorial we are going implement a simple auction service. It will have one auction and will track a best price and an id for the customer that bid that price. To properly demonstrate the state management and recovery features of cluster it is important to have some functionality that is stateful, rather than something that is stateless like an echo service.
Note
|
In order to show the various features of Cluster in an understandable way we’ve kept the various aspects of the code (data serialisation, application logic, Cluster integration) very close together. It is unlikely that you would do this in a real world application. You would probably want to have greater separation of concerns and perhaps use libraries to handle functionality like serialisation. |
We must define the link between the application logic and Aeron Cluster. This is done implementing the
ClusteredService
interface. The ClusteredService
interface defines a number of callbacks that inform us of messages
and lifecycle events as they occur as well as providing some of the hooks that our service can use to interact with the
cluster (e.g. sending response messages and taking snapshots).
public class BasicAuctionClusteredService implements ClusteredService
The clustered service container will notify the service that it has started using the onStart
callback. This will
occur before any input messages are received, either from log replay or live from a client. It is during this phase
that we need to load the initial state of the service. Aeron Cluster passes in a Image
that will contain the most
recent valid snapshot of the service. The service should take care of deserializing the data from the image and
initialize its state. We will come back to the details of how the snapshot is loaded after we’ve looked how messages
are handled and how a snapshot is stored.
public void onStart(final Cluster cluster, final Image snapshotImage)
{
this.cluster = cluster; // (1)
if (null != snapshotImage) // (2)
{
loadSnapshot(cluster, snapshotImage);
}
}
-
Take a reference to the cluster, we will need this in the future.
-
The snapshot can be null (this occurs the first time that the service is started).
The onSessionMessage
callback is the main entry point for requests coming into the cluster. Messages reach here by
being published to Cluster’s ingress channel. This method will also be passed messages during replay from a log. It is
from within this method we will interact with our application logic. Cluster also provides a reliable timestamp as a
parameter to this method. As mentioned earlier this is one of the challenges of building deterministic systems. Use
this value as the timestamp within your application state and it will be consistent under replay.
private final Auction auction = new Auction();
public void onSessionMessage(
final ClientSession session,
final long timestamp,
final DirectBuffer buffer,
final int offset,
final int length,
final Header header)
{
final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET); // (1)
final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
final long price = buffer.getLong(offset + PRICE_OFFSET);
final boolean bidSucceeded = auction.attemptBid(price, customerId); // (2)
if (null != session) // (3)
{
egressMessageBuffer.putLong(CORRELATION_ID_OFFSET, correlationId); // (4)
egressMessageBuffer.putLong(CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId());
egressMessageBuffer.putLong(PRICE_OFFSET, auction.getBestPrice());
egressMessageBuffer.putByte(BID_SUCCEEDED_OFFSET, bidSucceeded ? (byte)1 : (byte)0);
while (session.offer(egressMessageBuffer, 0, EGRESS_MESSAGE_LENGTH) < 0) // (5)
{
cluster.idle(); // (6)
}
}
}
For our input message we have 3 fields, a correlationId
which it the customer supplied identifier for the message,
customerId
to identify the customer placing the bid, and a price
(we’ve used a long for this, which represents the
value in cents).
-
Pull the data out of the message. This is similar to the pattern used in a Aeron Subscription’s
onFragment
callback when doing a poll. -
Execute the business logic, in our case this is applying the incoming bid to the auction to see if it is a winner.
-
The
ClientSession
allows the service to get information about the calling client, but also provide a means to return responses back to the client. However it will benull
during recovery, so we need to check for that state and not offer a response in that case. -
Serialise response message.
-
Calling
offer
on the client session will send the response on the egress channel. Make sure that the return value foroffer
is checked as it is a non-blocking call and not guaranteed to succeed. -
When doing any busy loops within the clustered application use
Cluster::idle(int)
within the wait loop to allow the service to pause in a friendly manner. It will take care of handling thread interrupts and ensure the node fails correctly.
As was mentioned earlier we need to regularly take snapshots of the service’s state in order to reduce the mean time to
recovery and facilitate release migration. There is a callback onTakeSnapshot
that will be called when it is time to
snapshot the state of the service.
Aeron Cluster provides an ExclusivePublication
to write a snapshot as the serialised representation of the application
logic’s state. For real world applications snapshotting can become tricky. The two big concerns you will have will be
ensure that snapshots are written in a consistent manner and dealing with fragmentation of the application state across
messages. For now, our state is so simple that neither of those will impact us.
public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
{
snapshotBuffer.putLong(CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId()); // (1)
snapshotBuffer.putLong(PRICE_OFFSET, auction.getBestPrice());
while (snapshotPublication.offer(snapshotBuffer, 0, SNAPSHOT_MESSAGE_LENGTH) < 0) // (2)
{
cluster.idle();
}
}
-
Write the persistent part of the application logic to a message buffer. In our case, the currently winning customer id and bid price.
-
Write the message to the publication, again we need to check the return from the
offer
call and useCluster::idle
inside of the busy wait loop.
Our onStart
implementation contained a method to load the snapshot. Now that we have seen how the snapshot is
written, we can look at how it is loaded. The Image
provided to the onStart
has a method that will indicate if
there is no more data available. However, application code should also encode enough information that the end of the
stream can be detected from the store messages. The the two approaches can be used to do sanity checking that all of
the data is received.
private void loadSnapshot(final Cluster cluster, final Image snapshotImage)
{
final MutableBoolean allDataLoaded = new MutableBoolean(false);
while (!snapshotImage.isEndOfStream()) // (1)
{
final int fragmentsPolled = snapshotImage.poll((buffer, offset, length, header) -> // (2)
{
assert length >= SNAPSHOT_MESSAGE_LENGTH; // (3)
final long customerId = buffer.getLong(offset + SNAPSHOT_CUSTOMER_ID_OFFSET);
final long price = buffer.getLong(offset + SNAPSHOT_PRICE_OFFSET);
auction.loadInitialState(price, customerId); // (4)
allDataLoaded.set(true);
}, 1);
if (allDataLoaded.value) // (5)
{
break;
}
cluster.idle(fragmentsPolled); // (6)
}
assert snapshotImage.isEndOfStream(); // (7)
assert allDataLoaded.value;
}
-
The method
Image::isEndOfStream
can be used to determine if there is going to be any more input. -
Because our snapshot is stream of messages written to a publication, we use the `Image’s poll method for extracting data from the snapshot.
-
Our total snapshot length (16 bytes) is going to be smaller than any reasonable MTU therefore we can assume that all of the data will come in a single message.
-
Once all of the data is loaded we can initialise the application logic state from the snapshot.
-
Once we’ve loaded all of the data for the application we can break out of the snapshot loading loop.
-
Again make sure we use
Cluster::idle
in the tight loops. It could take time for the snapshot to be propagated to the service so the number of fragments can be zero. -
It is also worthwhile having some sanity checks, these ensure that the snapshot store/load code and Aeron agree on where the end of the input data is. We’ve used asserts, but other mechanisms, (e.g. log message, exceptions) could also be used to indicate an issue.
Now we have our application implemented we can move onto running it in a cluster. Because there are a number of moving parts to setting up a cluster node, one of the trickiest parts of using Aeron Cluster is getting the configuration correct. We are going to start with a static three-node cluster with a simplified configuration where all of the components (MediaDriver, Archive, ConsensusModule and ClusteredServiceContainer) for a single node are hosted in a single process. We will then start the cluster as three separate processes.
In a production deployment, you will likely want to run the 3 instances on three separate servers, however for our example we just want to run the services on a single machine. This does make port allocation a concern as each node within the cluster needs to bind to a number of ports, so we need to make sure are no clashes. We could do this with VMs or containers, but in the interest of simplicity we are going to specify a port range for each node. I.e:
-
Node 0, ports: 9000-9099
-
Node 1, ports: 9100-9199
-
Node 2, ports: 9200-9299
private static final int PORT_BASE = 9000;
private static final int PORTS_PER_NODE = 100;
private static final int ARCHIVE_CONTROL_REQUEST_PORT_OFFSET = 1;
private static final int ARCHIVE_CONTROL_RESPONSE_PORT_OFFSET = 2;
public static final int CLIENT_FACING_PORT_OFFSET = 3;
private static final int MEMBER_FACING_PORT_OFFSET = 4;
private static final int LOG_PORT_OFFSET = 5;
private static final int TRANSFER_PORT_OFFSET = 6;
private static final int LOG_CONTROL_PORT_OFFSET = 7;
static int calculatePort(final int nodeId, final int offset)
{
return PORT_BASE + (nodeId * PORTS_PER_NODE) + offset;
}
private static String udpChannel(final int nodeId, final String hostname, final int portOffset)
{
final int port = calculatePort(nodeId, portOffset);
return new ChannelUriStringBuilder()
.media("udp")
.termLength(64 * 1024)
.endpoint(hostname + ":" + port)
.build();
}
While we don’t really need 100 ports for each node (closer to 10), it does make it clear which ports are assigned to each service. Each endpoint will be an offset from the first port in a node’s range. E.g. the archive control request port has an offset of 1, so for Node 2 it will have port 9201.
To start the cluster node we are going define our own class that has a main method, it will construct all of contexts for the necessary components and then start them all. As indicated above we are going to give each node of the cluster a unique id (0, 1, and 2). We are also going to use this value as the cluster member id.
public class BasicAuctionClusteredServiceNode
public static void main(final String[] args)
{
final int nodeId = parseInt(System.getProperty("aeron.tutorial.cluster.nodeId")); // (1)
final List<String> hostnames = Arrays.asList("localhost", "localhost", "localhost"); // (2)
final String hostname = hostnames.get(nodeId);
final File baseDir = new File(System.getProperty("user.dir"), "node" + nodeId); // (3)
final String aeronDirName = CommonContext.getAeronDirectoryName() + "-" + nodeId + "-driver";
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier(); // (4)
-
Pass in the node id as a command line parameter to the service so that we can reuse the code for each instance of the service.
-
Define a set of hostnames for the cluster. For now everything is running on localhost, but we could put actual hostnames or ip addresses in this list an run the same example across multiple servers.
-
For each node we need a location on disk that will hold the persistent data. This will include the raft log, mark files for each service component and the recording log used to track snapshots within the log. In a production system this location is likely to be fairly important as you will want to map this to a fast disk in order to get the best performance out the system.
-
Create a shutdown barrier that will be used to trap exit signals and allow the service to exit cleanly.
final MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.aeronDirectoryName(aeronDirName)
.threadingMode(ThreadingMode.SHARED)
.termBufferSparseFile(true)
.multicastFlowControlSupplier(new MinMulticastFlowControlSupplier())
.terminationHook(barrier::signal)
.errorHandler(BasicAuctionClusteredServiceNode.errorHandler("Media Driver"));
Note we’ve specified the custom aeronDirName to allow multiple Media Drivers on the same host. There is nothing special in the configuration in the Media Driver specific to Cluster.
final Archive.Context archiveContext = new Archive.Context()
.aeronDirectoryName(aeronDirName)
.archiveDir(new File(baseDir, "archive"))
.controlChannel(udpChannel(nodeId, "localhost", ARCHIVE_CONTROL_REQUEST_PORT_OFFSET))
.localControlChannel("aeron:ipc?term-length=64k")
.recordingEventsEnabled(false)
.threadingMode(ArchiveThreadingMode.SHARED);
Again nothing special in the Archive configuration specific to Cluster. We’ve used the same aeronDirName as used by the
Media Driver. The controlChannel
uses node specific port in the construction of its UDP channel.
final AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
.controlRequestChannel(archiveContext.controlChannel())
.controlRequestStreamId(archiveContext.controlStreamId())
.controlResponseChannel(udpChannel(nodeId, "localhost", ARCHIVE_CONTROL_RESPONSE_PORT_OFFSET))
.aeronDirectoryName(aeronDirName);
Because Cluster’s Consensus Module requires that we write a log file to support he Raft protocol, we need a client for the constructed Archive for it to use. The client needs a channel to receive responses back from the cluster, so again we are using the node specific port to prevent collisions.
final ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
.errorHandler(errorHandler("Consensus Module"))
.clusterMemberId(nodeId) // (1)
.clusterMembers(clusterMembers(hostnames)) // (2)
.aeronDirectoryName(aeronDirName) // (3)
.clusterDir(new File(baseDir, "consensus-module")) // (4)
.ingressChannel("aeron:udp?term-length=64k") // (5)
.logChannel(logControlChannel(nodeId, hostname, LOG_CONTROL_PORT_OFFSET)) // (6)
.archiveContext(aeronArchiveContext.clone()); // (7)
This is first of the two configuration sections that specific to Cluster. The Consensus Module is responsible for handling the main aspects of the Raft protocol, e.g. leader election, ensuring consensus-based consistency of the data in the log and passing properly replicated messages onto the Cluster Container.
-
Each Consensus Module needs an identifier within the Cluster, we are going to use the
nodeId
that we have used to separate each of the nodes. -
This is probably the trickiest part of the configuration. It specifies all of the static members of the cluster along with all of the endpoints that they require for the various operations of the Consensus Module. These are encoded into a single string of the form.
0,client-facing:port,member-facing:port,log:port,transfer:port,archive:port|\ 1,client-facing:port,member-facing:port,log:port,transfer:port,archive:port|...
Where each of the leading numeric values is a member id for the cluster (as specified in the
clusterMemberId
method). The values for each endpoint represent:-
client-facing, where the client will connect to for the ingress channel.
-
member-facing, where other members of the cluster will connect to.
-
log, used to replicate logs to.
-
transfer, used for a stream that members can use to catch up to a leader.
-
archive, the same endpoint used to control archive running on this node.
In our example we’ve used the same hostname for each of the endpoints (localhost), however each endpoint allows the specification of a host, so that traffic could potentially be separated if required, e.g. running member-facing traffic on a separate network to the client-facing traffic.
-
-
Use the node’s specific media driver.
-
Specify the data directory for the Consensus Module. Make sure it is node specific.
-
Define the ingress channel for the cluster. Note this value does not need to be the full channel URI. It can be a template that specifies the parameters, but excludes the endpoint, which will be filled using value from the clusterMembersString as appropriate for this node.
-
Specify the log channel. Like the ingress channel this is specified as a template. This channel needs to be a multi-destination cast or multicast channel. In our example we are using multi-destination cast using manual configuration, hence we need a separate control endpoint (include a port) used for adding and removing destination to the publication.
-
Clone the Archive Client context that the Consensus Module will use to talk to the Archive.
The final part of the configuration is the component that will actually run our application logic. It is possible to have multiple Clustered Service Containers per Consensus Module and have them talk to each other using IPC. In our simplified case we just have the one.
final ClusteredServiceContainer.Context clusteredServiceContext =
new ClusteredServiceContainer.Context()
.aeronDirectoryName(aeronDirName) // (1)
.archiveContext(aeronArchiveContext.clone()) // (2)
.clusterDir(new File(baseDir, "service"))
.clusteredService(new BasicAuctionClusteredService()) // (3)
.errorHandler(errorHandler("Clustered Service"));
-
Again we use the node specific Media Driver.
-
And the node’s Archive, via the Archive Client configuration.
-
This is the point where we bind an instance of our application logic to the cluster.
Now that the cluster is configured we can start it running. The code for launching the service is as follows.
try (
ClusteredMediaDriver clusteredMediaDriver = ClusteredMediaDriver.launch(
mediaDriverContext, archiveContext, consensusModuleContext); // (1)
ClusteredServiceContainer container = ClusteredServiceContainer.launch(
clusteredServiceContext)) // (2)
{
System.out.println("[" + nodeId + "] Started Cluster Node on " + hostname + "...");
barrier.await(); // (3)
System.out.println("[" + nodeId + "] Exiting");
}
-
Launches a
ClusteredMediaDriver
that includes instances of the Media Driver, Archive and Consensus Module. -
Immediately afterward we launch a
ClusteredServiceContainer
which is our application. -
Use the shutdown barrier to keep our system live until it is signalled to shutdown (e.g. using SIG_TERM or SIG_INT on Unix).
There is a script provide to launch the cluster. Assuming that you’ve followed the Using the Source From this Tutorial step above you should be able to do the following:
> cd <AERON_HOME>/aeron-samples/scripts/cluster > ./basic-auction-cluster
This will spit out a whole lot of logging but should end with something like:
CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_TRANSITION -> FOLLOWER_READY, memberId=2 CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_CATCHUP -> FOLLOWER_TRANSITION, memberId=1 CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_TRANSITION -> FOLLOWER_READY, memberId=1 CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_READY -> CLOSE, memberId=1 CLUSTER: ELECTION_STATE_CHANGE, FOLLOWER_READY -> CLOSE, memberId=2 CLUSTER: ELECTION_STATE_CHANGE, LEADER_READY -> CLOSE, memberId=0
We now have our new service running in a cluster.
You can shutdown the nodes using the following:
> pkill -f BasicAuctionClusteredServiceNode
You should also be able see the to the stored data for each of the nodes in the working directory.
> ls basic-auction-client basic-auction-cluster logs node0 node1 node2 script-common
Lets look briefly how the script launches the code.
function startNode() {
${JAVA_HOME}/bin/java \
-cp ../../../aeron-all/build/libs/aeron-all-${VERSION}.jar \
-javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}-all.jar \
-XX:BiasedLockingStartupDelay=0 \
-XX:+UnlockExperimentalVMOptions \
-XX:+TrustFinalNonStaticFields \
-XX:+UnlockDiagnosticVMOptions \
-XX:GuaranteedSafepointInterval=300000 \
-XX:+UseParallelOldGC \
-Daeron.event.cluster.log=all \
-Daeron.tutorial.cluster.nodeId=$1 \
${JVM_OPTS} ${ADD_OPENS} \
io.aeron.samples.tutorial.cluster.BasicAuctionClusteredServiceNode > logs/cluster-$1.log &
}
function startAll() {
startNode 0
startNode 1
startNode 2
tail -F logs/cluster-*.log
}
The -javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}-all.jar
allows the weaving of the Aeron’s
logging agent into the running code. Specifying -Daeron.event.cluster.log=all
tells the agent to log all of the
events relating to the cluster operation.
While we now have our cluster up and running, we can’t do anything with it until we have a client that will interact with the cluster.
First off we need a way to connect our client to the cluster. All of the messaging between the client and the cluster
happens via Aeron, but we have an additional layer called AeronCluster
that handles connections to the independent
nodes and provides methods for offering messages into the cluster and polling for responses.
public class BasicAuctionClusterClient implements EgressListener
final int egressPort = 19000 + customerId;
try (
MediaDriver mediaDriver = MediaDriver.launchEmbedded(new MediaDriver.Context() // (1)
.threadingMode(ThreadingMode.SHARED)
.dirDeleteOnStart(true)
.dirDeleteOnShutdown(true));
AeronCluster aeronCluster = AeronCluster.connect(new AeronCluster.Context()
.egressListener(client) // (2)
.egressChannel("aeron:udp?endpoint=localhost:" + egressPort) // (3)
.aeronDirectoryName(mediaDriver.aeronDirectoryName())
.ingressChannel("aeron:udp") // (4)
.clusterMemberEndpoints(clusterMembers))) // (5)
{
-
Launch a media driver that will allow communication with the cluster nodes. We’re using an embedded instance here to make it easier to launch multiple drivers on the same machine for use in this tutorial, but it would be perfectly viable to launch a single media driver and have multiple clients on the same host sharing the same media driver. We’re also avoiding using the media drivers that each of the cluster nodes are using.
-
This is where we bind our client application code to the responses from the cluster. This requires implementing the
EgressListener
interface, which provides callbacks for session messages, session related events (e.g. errors), and cluster events (e.g. new elected leaders). -
Specify the channel to receive egress responses back from the cluster. In our case we are using a UDP unicast response, which will limit responses to the session that sent the ingress message. We are adjusting the port per client to allow for multiple clients on the same machine with multiple media drivers. If a single media driver was used then this would not be necessary.
It is possible to specify a multicast address here if you wanted all clients to see all responses. Some systems may find this useful, e.g. an exchange with multiple gateways may wish to have the execution for a trade sent to a client that was on the passive side of a trade.
-
Specify the ingress channel for the cluster. In our case as we are using multiple destinations we need to use the template style approach that was used when setting up the Consensus Module. We just identify that access to the cluster is via UDP.
-
Identify the actual static endpoints for the cluster. Will be merged into the configuration specified for the ingress channel.
Note
|
This example is using UDP unicast for the ingress and egress channels. It is possible to use multicast as well, but you will need to be in environment where that is well supported. In cloud deployments UDP multicast is not supported or in very early stages, so unicast is currently recommended in those environments. |
Next we are going push messages into the cluster itself. So that the client is able to fail over to different nodes as
nodes come and go in the cluster the Aeron publications are wrapped in a class called AeronCluster
. This is the
second of the two items that are connected at start up. This class provides an API very similar to the Publication API.
private long sendBid(final AeronCluster aeronCluster, final long price)
{
final long correlationId = this.correlationId++;
actionBidBuffer.putLong(CORRELATION_ID_OFFSET, correlationId); // (1)
actionBidBuffer.putLong(CUSTOMER_ID_OFFSET, customerId);
actionBidBuffer.putLong(PRICE_OFFSET, price);
while (aeronCluster.offer(actionBidBuffer, 0, BID_MESSAGE_LENGTH) < 0) // (2)
{
idleStrategy.idle(aeronCluster.pollEgress()); // (3)
}
return correlationId;
}
-
In much that same way that we would send messages to a publication we put the data that we want to send into an Agrona
DirectBuffer
. TheAeronCluster
contains offer methods that behave in the same way as thePublication
offer methods. -
Publish the data in the same way we would using a
Publication
. We must check the return value to ensure that the message has been sent. -
If we fail to send and need to run a busy wait loop, we should utilise an
IdleStrategy
to ensure that we don’t inappropriately overuse the CPU. With cluster clients we should also be continually polling the egress topic to pick up any messages that have been sent back from the cluster, including errors or session status messages.
As was mentioned previously our client must have a class that implements the EgressListener
interface to receive
messages that come back from the cluster. The responses can application messages, session events (e.g. error messages),
or notifications of a new leader. In this tutorial there is not a lot to do, we simple print out the responses that we
receive.
public void onMessage(
final long clusterSessionId,
final long timestamp,
final DirectBuffer buffer,
final int offset,
final int length,
final Header header)
{
final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET);
final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
final long currentPrice = buffer.getLong(offset + PRICE_OFFSET);
final boolean bidSucceed = 0 != buffer.getByte(offset + BID_SUCCEEDED_OFFSET);
lastBidSeen = currentPrice;
printOutput(
"SessionMessage(" + clusterSessionId + "," + correlationId + "," +
customerId + "," + currentPrice + "," + bidSucceed + ")");
}
public void sessionEvent(
final long correlationId,
final long clusterSessionId,
final long leadershipTermId,
final int leaderMemberId,
final EventCode code,
final String detail)
{
printOutput(
"SessionEvent(" + correlationId + "," + leadershipTermId + "," +
leaderMemberId + "," + code + "," + detail + ")");
}
public void newLeader(
final long clusterSessionId,
final long leadershipTermId,
final int leaderMemberId,
final String memberEndpoints)
{
printOutput(
"New Leader(" + clusterSessionId + "," + leadershipTermId + "," + leaderMemberId + ")");
}
Now that we have all of the pieces in place we start to run the cluster and client together and see how the cluster behaves. You will need at least three terminal windows as we will leave the one that we use to start the cluster open showing logging information for the cluster.
Firstly start the cluster up.
> cd <AERON_HOME>/aeron-samples/scripts/cluster > ./basic-auction-cluster
And wait until you see a message that indicates one of the nodes is now the leader
[57240.190127954] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=1, LEADER_READY -> CLOSE
Now in a second terminal window run a client that will place some bids into our cluster hosted auction. The 10
parameter is the customer id to use for the client, change this number if you want to run multiple clients.
> cd <AERON_HOME>/aeron-samples/scripts/cluster > ./basic-auction-client 10
We should see some output similar to the following:
Sent (8474093274723075706,10,105) bidsRemaining = 9 SessionMessage(1,8474093274723075706,10,105,true) Sent (8474093274723075707,10,109) bidsRemaining = 8 SessionMessage(1,8474093274723075707,10,109,true) Sent (8474093274723075708,10,110) bidsRemaining = 7 SessionMessage(1,8474093274723075708,10,110,true) Sent (8474093274723075709,10,119) bidsRemaining = 6 SessionMessage(1,8474093274723075709,10,119,true) Sent (8474093274723075710,10,122) bidsRemaining = 5 SessionMessage(1,8474093274723075710,10,122,true) Sent (8474093274723075711,10,127) bidsRemaining = 4 SessionMessage(1,8474093274723075711,10,127,true) Sent (8474093274723075712,10,129) bidsRemaining = 3 SessionMessage(1,8474093274723075712,10,129,true) Sent (8474093274723075713,10,135) bidsRemaining = 2 SessionMessage(1,8474093274723075713,10,135,true) Sent (8474093274723075714,10,136) bidsRemaining = 1 SessionMessage(1,8474093274723075714,10,136,true) Sent (8474093274723075715,10,142) bidsRemaining = 0 SessionMessage(1,8474093274723075715,10,142,true)
Now that the cluster is up and running and we are able to send and receive messages, we can look at some of the additional tools that are available for Cluster and experiment with some of the failover functionality.
First thing that we are interested in is which node of the three is the leader, with this information we can try to shutdown the client and see if our client is able to recover.
> java -cp <AERON_HOME>/build/libs/aeron-all-1.25.2-SNAPSHOT.jar \ io.aeron.cluster.ClusterTool node0/consensus-module list-members
This will show a list of all of the members in the cluster, similar to the following:
currentTimeNs=1579564309666000000, leaderMemberId=0, memberId=0, activeMembers=[ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=0, leadershipTermId=5, logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1, timeOfLastAppendPositionNs=1579564309666000000, clientFacingEndpoint='localhost:9003', memberFacingEndpoint='localhost:9004', logEndpoint='localhost:9005', transferEndpoint='localhost:9006', archiveEndpoint='localhost:9001', endpointsDetail='localhost:9003,localhost:9004,localhost:9005,localhost:9006,localhost:9001', publication=null, vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=1, leadershipTermId=5, logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1, timeOfLastAppendPositionNs=1579564309495000000, clientFacingEndpoint='localhost:9103', memberFacingEndpoint='localhost:9104', logEndpoint='localhost:9105', transferEndpoint='localhost:9106', archiveEndpoint='localhost:9101', endpointsDetail='localhost:9103,localhost:9104,localhost:9105,localhost:9106,localhost:9101', publication=null, vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=2, leadershipTermId=5, logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1, timeOfLastAppendPositionNs=1579564309552000000, clientFacingEndpoint='localhost:9203', memberFacingEndpoint='localhost:9204', logEndpoint='localhost:9205', transferEndpoint='localhost:9206', archiveEndpoint='localhost:9201', endpointsDetail='localhost:9203,localhost:9204,localhost:9205,localhost:9206,localhost:9201', publication=null, vote=null}], passiveMembers=[]
The part we are interested in is the leaderMemberId, this allows us to identify the leader in order to shut it down. We can discover the pid for that member using the following command:
> java -cp <AERON_HOME>/build/libs/aeron-all-1.25.2-SNAPSHOT.jar \ io.aeron.cluster.ClusterTool node<leaderMemberId>/consensus-module pid 15060
Note the second argument to the ClusterTool
is the directory where the Consensus Module’s data is stored and we have
substituted the leaderMemberId to locate the correct directory for the leader.
This will have printed out a simple number which is the pid of the leader process.
Next start the client again, but this time we will let it run a bit longer and attempt to trigger a leader election within the cluster while it is running.
> cd <AERON_HOME>/aeron-samples/scripts/cluster > ./basic-auction-client 10 1000 3000
Once the client is running, switch to a different terminal and kill the leader process using the pid from the earlier step.
> kill <pid>
The system will stall for a short while until the followers timeout the leader an decide to elect a new one. The logs for the cluster will contain something similar to:
==> logs/cluster-0.log <== [0] Exiting Consensus Module io.aeron.cluster.client.ClusterException: heartbeat timeout from leader at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884) at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288) at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283) at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164) at java.base/java.lang.Thread.run(Thread.java:834) Consensus Module io.aeron.cluster.client.ClusterException: heartbeat timeout from leader at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884) at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288) at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283) at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164) at java.base/java.lang.Thread.run(Thread.java:834) ... [65188.006731058] CLUSTER: ELECTION_STATE_CHANGE [42/42]: memberId=1, FOLLOWER_BALLOT -> FOLLOWER_REPLAY [65188.006791979] CLUSTER: ELECTION_STATE_CHANGE [54/54]: memberId=1, FOLLOWER_REPLAY -> FOLLOWER_CATCHUP_TRANSITION [65188.009951316] CLUSTER: ELECTION_STATE_CHANGE [55/55]: memberId=1, FOLLOWER_CATCHUP_TRANSITION -> FOLLOWER_CATCHUP [65188.028244817] CLUSTER: ELECTION_STATE_CHANGE [47/47]: memberId=1, FOLLOWER_CATCHUP -> FOLLOWER_TRANSITION [65188.028355173] CLUSTER: ELECTION_STATE_CHANGE [45/45]: memberId=1, FOLLOWER_TRANSITION -> FOLLOWER_READY [65188.02995347] CLUSTER: ELECTION_STATE_CHANGE [31/31]: memberId=1, FOLLOWER_READY -> CLOSE ==> logs/cluster-2.log <== [65188.033401987] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=2, LEADER_READY -> CLOSE
Showing that a new leader has been elected to run the cluster. At this point the cluster will have no redundency and will only have two working nodes.
The client should of automatically resumed sending messages into the cluster and receiving responses. There may or may not be an exception logged by the client depending on timing.
Sent (-7819190107498904370,10,12969) bidsRemaining = 980 SessionMessage(20,-7819190107498904370,10,12969,true) Sent (-7819190107498904369,10,12969) bidsRemaining = 979 Sent (-7819190107498904368,10,12970) bidsRemaining = 978 Sent (-7819190107498904367,10,12978) bidsRemaining = 977 Sent (-7819190107498904366,10,12975) bidsRemaining = 976 Sent (-7819190107498904365,10,12971) bidsRemaining = 975 Consensus Module io.aeron.cluster.client.ClusterException: heartbeat timeout from leader at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884) at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288) at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283) at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164) at java.lang.Thread.run(Thread.java:748) New Leader(20,7,1) Sent (-7819190107498904364,10,12977) bidsRemaining = 974 Sent (-7819190107498904363,10,12974) bidsRemaining = 973 SessionMessage(20,-7819190107498904364,10,12977,true) SessionMessage(20,-7819190107498904363,10,12977,false)
Important
|
If you look carefully at the correlationId values you will notice there are no SessionMessage log entries
for a number of the messages that were sent. This is because when the leader fails some of the messages being sent into
the cluster can be lost. For this reason it is important the client needs to track responses to it’s messages and
provide timeouts to the caller if required.
|
We can restore the failed node back into service:
> cd <AERON_HOME>/aeron-samples/scripts/cluster > ./basic-auction-cluster <leaderMemberId>
Where the leaderMemberId is id of the member that was killed (not the new leader of the cluster). The cluster logs will show the member rejoining the cluster:
[0] Started Cluster Node on localhost... [66250.672741109] CLUSTER: STATE_CHANGE [22/22]: memberId=0, INIT -> ACTIVE [66250.687237677] CLUSTER: ELECTION_STATE_CHANGE [23/23]: memberId=0, INIT -> CANVASS [66250.689468615] CLUSTER: NEW_LEADERSHIP_TERM [40/40]: logLeadershipTermId=6, leadershipTermId=6, logPosition=19424, timestamp=1579566538529, leaderMemberId=2, logSessionId=-1889384498 [66250.689493099] CLUSTER: ELECTION_STATE_CHANGE [34/34]: memberId=0, CANVASS -> FOLLOWER_REPLAY onSessionOpen(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false}) attemptBid(this=Auction{bestPrice=0, currentWinningCustomerId=-1}, price=123,customerId=132) onSessionClose(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false}) onSessionOpen(ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})
You can see that the new node looks like it is re-executing application messages. That is because it is. It is replaying the log of the messages that have been sent into the cluster up to this point in order to restore its in-memory to what it was before the failure.
Earlier in the tutorial we looked at how a node can store a snapshot of its current state so that it would not need to replay every single application message since the beginning of time. We can look at the recording log for a node to see the logs and the snapshots that are stored for a node in the cluster. We can also use the tools to trigger a snapshot.
Firstly lets look at the recording log.
> java -cp <AERON_HOME>/build/libs/aeron-all-1.25.2-SNAPSHOT.jar node0/consensus-module list-members
Which should show a recording log with a single entry:
RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}], cacheIndex={0=0}}
We can trigger an snapshot on the master. This is slightly different to using the ClusterTool
in that the
ClusterControl
needs the Aeron directory for the node’s media driver.
> java -cp ../../../aeron-all/build/libs/aeron-all-1.25.1-SNAPSHOT.jar \ -Daeron.dir=/dev/shm/aeron-mike-<leaderMemberId>-driver/ io.aeron.cluster.ClusterControl SNAPSHOT
The recording log will now have additional entries.
RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}, Entry{recordingId=1, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=0, type=1, isValid=true, entryIndex=1}, Entry{recordingId=2, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=-1, type=1, isValid=true, entryIndex=2}], cacheIndex={0=0}}
There are two more entries. One for the snapshot just taken and a now term entry to indicate the point within the log that the service should start recovering from were it to use the new snapshot on restart.
Here are some exercises that you can try out.
-
Have multiple customers competing on the same auction.
-
Use multicast for the egress channel so that customers can see each others prices and make better bids.
-
Implement a second message type that will allow customers to query for the current bid price of the auction.