This project contains an example implementation of cluster sharding. Here we will focus on the implementation details in this project. Please see the Akka documentation for a more detailed discussion about cluster sharding.

What is cluster sharding

According to the Akka documentation:

Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time.

The common usage for cluster sharding is to distribute and engage with individual actors across the cluster. Each of these distributed actors is used to handle messages that are intended for a specific entity. Each entity represents a thing, such as a bank account or a shopping cart. Entities each have a unique identifier, such as an account or shopping cart identifier.

In this example project, the entities represent simple identifier and value. In a real application, entities represent real things, such as bank accounts. Each entity handles incoming messages. These messages are either commands, which are requests to chage the state of the entity. Other messages are query requests that are used to retrieve entity information.

Two actors are used to simulate clients that are sending messages to entities. The EntityCommandActor and the EntityQueryActor randomly generate messages to specific entities. These two actors are used to simulate incoming service requests. In a real implementation, the service would receive incoming messages, for example from an HTTP request, and forward those messages to specific entities to handle the request messages.

The process of forwarding these messages to the right entities, which could be distributed across multiple JVMs running in a cluster, is handled by cluster sharding. To send a message to an entity the sender simply sends the message to a shard region actor. The shard region actor is responsible for forwarding the message to the correct entity actor. The actual mechanics of this process is described in the “How it works” section of the cluster sharding documentation.

Visualization of cluster sharding Figure 1. Visualization of cluster sharding

The visualization in Figure 1 shows an example of cluster sharding. The blue leaf actors represent the entity actors. Each entity actor represents the state of an entity. The green circles that connect to the entity circles represent the running shard actors. In the example system there 15 shards configured. The shards connect to the orange shard region actors. These orange circles also represent other actors, such as the entity command and query actors. Also, the orange circles represent the root of the actor system on each cluster node.

How it works

The Runner class contains the main method. The main method starts one or more Akka systems and in each actor system it starts instances of multiple actors.

The arguments passed to the main method are expected to be zero or more port numbers. These port numbers will be used to start cluster nodes, one for each specified port.

If no ports are specified a default is used to start three JVMs using ports 2551, 2552, and 0 respectively.

if (args.length == 0) {
    startupClusterNodes(Arrays.asList("2551", "2552", "0"));
} else {
    startupClusterNodes(Arrays.asList(args));
}

Multiple actor systems may be started in a single JVM. However, the typical use case is that a single actor system is started per JVM. One way to think of an actor system is that they are supercharged thread pools.

The startupClusterNodes method is called with the list of specified port numbers. Each port is used to start an actor system and then start up various actors that will run in the demonstration.

The most notable actor in this cluster sharding example is the shardRegion actor.

ActorRef shardingRegion = setupClusterSharding(actorSystem);

This actor is instantiated in the setupClusterSharding method.

private static ActorRef setupClusterSharding(ActorSystem actorSystem) {
    ClusterShardingSettings settings = ClusterShardingSettings.create(actorSystem);
    return ClusterSharding.get(actorSystem).start(
            "entity",
            EntityActor.props(),
            settings,
            EntityMessage.messageExtractor()
    );
}

This method uses the ClusterSharding static get method to create an instance of a single shard region actor per actor system. More details on how the shard region actors are used is described above. The get method is used to create a shard region actor passing it the code to be used to create an instance of an entity actor (EntityActor.props()) and the code used to extract entity and shard identifiers from messages that are sent to entity actors (EntityMessage.messageExtractor()).

actorSystem.actorOf(EntityCommandActor.props(shardingRegion), "entityCommand");
actorSystem.actorOf(EntityQueryActor.props(shardingRegion), "entityQuery");

The shardRegion actor reference is passed as a constructor argument to the EntityCommandActor and the EntityQueryActor. These generate simulated random message traffic, they use the shardRegion actor ref to send messages to specific entity actors.

shardRegion.tell(command(), self());

The shardRegion actor handles the heavy lifting of routing each message to the correct entity actor.

Entity actors have an interesting life-cycle. When messages are sent to a shard region actor, it routes the message to a shard actor that is responsible for the specific entity as defined by the message entity identifier.

The shared region actor is responsible for handling the routing of entity messages to the specific shard actors, which may involve other cluster sharding internal actors, and this may include forwarding the message from one cluster node to another.

When a shard actor receives an incoming entity message, it checks to see if the entity actor instance exits. If the entity actor instance does not exist, then an instance of the entity actor is created, and the message is forwarded to the newly started entity actor instance. If the entity actor instance already exists, then the message is forwarded from the shard actor to the specific entity actor instance.

Here is the source code of our example entity actor.

package cluster.sharding;

import akka.actor.AbstractLoggingActor;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.sharding.ShardRegion;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.TimeUnit;

class EntityActor extends AbstractLoggingActor {
    private Entity entity;
    private final FiniteDuration receiveTimeout = Duration.create(60, TimeUnit.SECONDS);

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(EntityMessage.Command.class, this::command)
                .match(EntityMessage.Query.class, this::query)
                .matchEquals(ReceiveTimeout.getInstance(), t -> passivate())
                .build();
    }

    private void command(EntityMessage.Command command) {
        log().info("{} <- {}", command, sender());
        if (entity == null) {
            entity = command.entity;
            final EntityMessage.CommandAck commandAck = EntityMessage.CommandAck.ackInit(command);
            log().info("{}, {} -> {}", commandAck, command, sender());
            sender().tell(commandAck, self());
        } else {
            entity.value = command.entity.value;
            final EntityMessage.CommandAck commandAck = EntityMessage.CommandAck.ackUpdate(command);
            log().info("{}, {} -< {}", commandAck, command, sender());
            sender().tell(commandAck, self());
        }
    }

    private void query(EntityMessage.Query query) {
        log().info("{} <- {}", query, sender());
        if (entity == null) {
            final EntityMessage.QueryAckNotFound queryAck = EntityMessage.QueryAckNotFound.ack(query);
            log().info("{} -> {}", queryAck, sender());
            sender().tell(queryAck, self());
        } else {
            final EntityMessage.QueryAck queryAck = EntityMessage.QueryAck.ack(query, entity);
            log().info("{} -> {}", queryAck, sender());
            sender().tell(queryAck, self());
        }
    }

    private void passivate() {
        context().parent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), self());
    }

    @Override
    public void preStart() {
        log().info("Start");
        context().setReceiveTimeout(receiveTimeout);
    }

    @Override
    public void postStop() {
        log().info("Stop {}", entity == null ? "(not initialized)" : entity.id);
    }

    static Props props() {
        return Props.create(EntityActor.class);
    }
}

Entity actors are typically set up to shut themselves down when they stop receiving messages.

@Override
public void preStart() {
    log().info("Start");
    context().setReceiveTimeout(receiveTimeout);
}

The timeout period is set via a call to the SetReceiveTimeout(...) method. What this does is whenever the entity actor receives a message the timeout timer is reset.

@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(EntityMessage.Command.class, this::command)
            .match(EntityMessage.Query.class, this::query)
            .matchEquals(ReceiveTimeout.getInstance(), t -> passivate())
            .build();
}

When no messages are received before the timeout period has expired then the entity actor is set a ReceiveTimeout message. In our example entity actor a receive timeout message triggers a call to a method called passivate().

private void passivate() {
    context().parent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), self());
}

In the passivate() method a message is sent to the entity actor’s parent actor, which is the shard actor, asking it to trigger a shutdown of this entity actor.

Installation

git clone https://github.com/mckeeh3/akka-java-cluster-sharding.git
cd akka-java-cluster-sharding
mvn clean package

The Maven command builds the project and creates a self contained runnable JAR.

Run a cluster

Run a cluster on Mac, Linux

The project contains a set of scripts that can be used to start and stop individual cluster nodes or start and stop a cluster of nodes.

The main script ./akka is provided to run a cluster of nodes or start and stop individual nodes. Use ./akka node start [1-9] | stop to start and stop individual nodes and ./akka cluster start [1-9] | stop to start and stop a cluster of nodes. The cluster and node start options will start Akka nodes on ports 2551 through 2559. Both stdin and stderr output is sent to a file in the /tmp directory using the file naming convention /tmp/<project-dir-name>-N.log.

Start node 1 on port 2551 and node 2 on port 2552.

./akka node start 1
./akka node start 2

Stop node 3 on port 2553.

./akka node stop 3

Start a cluster of four nodes on ports 2551, 2552, 2553, and 2554.

./akka cluster start 4

Stop all currently running cluster nodes.

./akka cluster stop

You can use the ./akka cluster start [1-9] script to start multiple nodes and then use ./akka node start [1-9] and ./akka node stop [1-9] to start and stop individual nodes.

Use the ./akka node tail [1-9] command to tail -f a log file for nodes 1 through 9.

The ./akka cluster status command displays the status of a currently running cluster in JSON format using the Akka Management extension Cluster Http Management.

Run a cluster on Windows, command line

The following Maven command runs a signle JVM with 3 Akka actor systems on ports 2551, 2552, and a radmonly selected port.

mvn exec:java

Use CTRL-C to stop.

To run on specific ports use the following -D option for passing in command line arguements.

mvn exec:java -Dexec.args="2551"

The default no arguments is equilevalant to the following.

mvn exec:java -Dexec.args="2551 2552 0"

A common way to run tests is to start single JVMs in multiple command windows. This simulates running a multi-node Akka cluster.

For example, run the following 4 commands in 4 command windows.

mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log
mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log

This runs a 4 node Akka cluster starting 2 nodes on ports 2551 and 2552, which are the cluster seed nodes as configured and the application.conf file.

And 2 nodes on randomly selected port numbers.

The optional redirect > /tmp/$(basename $PWD)-4.log is an example for pushing the log output to filenames based on the project direcctory name.

For convenience, in a Linux command shell define the following aliases.

alias p1='cd ~/akka-java/akka-java-cluster'
alias p2='cd ~/akka-java/akka-java-cluster-aware'
alias p3='cd ~/akka-java/akka-java-cluster-singleton'
alias p4='cd ~/akka-java/akka-java-cluster-sharding'
alias p5='cd ~/akka-java/akka-java-cluster-persistence'
alias p6='cd ~/akka-java/akka-java-cluster-persistence-query'

alias m1='clear ; mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log'
alias m2='clear ; mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log'
alias m3='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log'
alias m4='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log'

The p1-6 alias commands are shortcuts for cd’ing into one of the six project directories. The m1-4 alias commands start and Akka node with the appropriate port. Stdout is also redirected to the /tmp directory.