Come realizzare una semplice applicazione con Akka Cluster

Se hai letto la mia storia precedente su Scalachain, probabilmente hai notato che è ben lungi dall'essere un sistema distribuito. Manca di tutte le funzionalità per funzionare correttamente con altri nodi. Aggiungete ad esso che una blockchain composta da un singolo nodo è inutile. Per questo ho deciso che è ora di lavorare sulla questione.

Dato che Scalachain è alimentato da Akka, perché non cogliere l'occasione per giocare con Akka Cluster? Ho creato un semplice progetto per armeggiare un po 'con Akka Cluster e in questa storia condividerò le mie conoscenze. Creeremo un cluster di tre nodi, utilizzando i router compatibili con il cluster per bilanciare il carico tra di loro. Tutto verrà eseguito in un container Docker e useremo docker-compose per una facile distribuzione.

Ok, andiamo! ?

Rapida introduzione ad Akka Cluster

Akka Cluster fornisce un ottimo supporto per la creazione di applicazioni distribuite. Il miglior caso d'uso è quando si dispone di un nodo che si desidera replicare N volte in un ambiente distribuito. Ciò significa che tutti gli N nodi sono peer che eseguono lo stesso codice. Akka Cluster ti offre la scoperta immediata dei membri nello stesso cluster. Utilizzando i router compatibili con il cluster è possibile bilanciare i messaggi tra attori in nodi diversi. È anche possibile scegliere la politica di bilanciamento, rendendo il bilanciamento del carico un gioco da ragazzi!

In realtà puoi scegliere tra due tipi di router:

Router di gruppo : gli attori a cui inviare i messaggi, chiamati route, vengono specificati utilizzando il percorso dell'attore. I router condividono le route create nel cluster. Useremo un router di gruppo in questo esempio.

Pool Router : le route vengono create e distribuite dal router, quindi sono i suoi figli nella gerarchia degli attori. I router non sono condivisi tra i router. Questo è l'ideale per uno scenario di replica primaria, in cui ogni router è il principale e il suo instradamento le repliche.

Questa è solo la punta dell'iceberg, quindi ti invito a leggere la documentazione ufficiale per ulteriori approfondimenti.

Un cluster per calcoli matematici

Immaginiamo uno scenario di caso d'uso. Supponiamo di progettare un sistema per eseguire calcoli matematici su richiesta. Il sistema viene distribuito online, quindi necessita di un'API REST per ricevere le richieste di calcolo. Un processore interno gestisce queste richieste, eseguendo il calcolo e restituendo il risultato.

In questo momento il processore può calcolare solo il numero di Fibonacci. Decidiamo di utilizzare un cluster di nodi per distribuire il carico tra i nodi e migliorare le prestazioni. Akka Cluster gestirà le dinamiche del cluster e il bilanciamento del carico tra i nodi. Ok suona bene!

Gerarchia dell'attore

Per prima cosa: dobbiamo definire la nostra gerarchia di attori. Il sistema può essere suddiviso in tre parti funzionali: la logica aziendale , la gestione del cluster e il nodo stesso. C'è anche il server, ma non è un attore, e ci lavoreremo in seguito.

Logica di business

L'applicazione dovrebbe eseguire calcoli matematici. Possiamo definire un semplice Processorattore per gestire tutti i compiti computazionali. Ogni calcolo che supportiamo può essere implementato in uno specifico attore, che sarà figlio di Processorquello. In questo modo l'applicazione è modulare e più facile da estendere e mantenere. In questo momento l'unico figlio di Processorsarà l' ProcessorFibonacciattore. Suppongo che tu possa indovinare qual è il suo compito. Questo dovrebbe essere sufficiente per iniziare.

Gestione dei cluster

Per gestire il cluster abbiamo bisogno di un file ClusterManager. Sembra semplice, vero? Questo attore gestisce tutto ciò che riguarda il cluster, come restituire i suoi membri quando richiesto. Sarebbe utile registrare ciò che accade all'interno del cluster, quindi definiamo un ClusterListenerattore. Questo è un figlio di ClusterManagere si iscrive agli eventi del cluster registrandoli.

Nodo

L' Nodeattore è la radice della nostra gerarchia. È il punto di ingresso del nostro sistema che comunica con l'API. The Processorand the ClusterManagersono i suoi figli, insieme ProcessorRouterall'attore. Questo è il bilanciatore del carico del sistema, distribuendo il carico tra Processors. Lo configureremo come un router compatibile con i cluster, quindi tutti ProcessorRouterpossono inviare messaggi a Processors su ogni nodo.

Attuazione dell'attore

È ora di implementare i nostri attori! Per prima cosa implementiamo gli attori legati alla logica di business del sistema. Si passa poi agli attori per la gestione del cluster e Nodeinfine all'attore root ( ).

ProcessoreFibonacci

Questo attore esegue il calcolo del numero di Fibonacci. Riceve un Computemessaggio contenente il numero da calcolare e il riferimento dell'attore a cui rispondere. Il riferimento è importante, poiché possono esserci diversi attori richiedenti. Ricorda che stiamo lavorando in un ambiente distribuito!

Una volta Computericevuto il messaggio, la fibonaccifunzione calcola il risultato. Lo avvolgiamo in un ProcessorResponseoggetto per fornire informazioni sul nodo che ha eseguito il calcolo. Ciò sarà utile in seguito per vedere la politica del round robin in azione.

Il risultato viene quindi inviato all'attore a cui dovremmo rispondere. Vai tranquillo.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Processore

L' Processorattore gestisce i sub-processori specifici, come quello di Fibonacci. Dovrebbe istanziare i sub-processori e inoltrare loro le richieste. In questo momento abbiamo solo un sub-processore, in modo che il Processorriceve un tipo di messaggio: ComputeFibonacci. Questo messaggio contiene il numero di Fibonacci da calcolare. Una volta ricevuto, il numero da calcolare viene inviato a a FibonacciProcessor, insieme al riferimento del file sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Vorremmo registrare informazioni utili su ciò che accade nel cluster. Questo potrebbe aiutarci a eseguire il debug del sistema se necessario. Questo è lo scopo ClusterListenerdell'attore. Prima di iniziare, si iscrive ai messaggi di evento del cluster. I reagisce attore messaggi come MemberUp, UnreachableMembero MemberRemoved, registrazione dell'evento corrispondente. Quando ClusterListenerviene interrotto, si annulla l'iscrizione agli eventi del cluster.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

L'attore responsabile della gestione del cluster è ClusterManager. Crea l' ClusterListenerattore e fornisce l'elenco dei membri del cluster su richiesta. Potrebbe essere esteso per aggiungere più funzionalità, ma in questo momento è sufficiente.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

The load-balancing among processors is handled by the ProcessorRouter. It is created by the Node actor, but this time all the required information are provided in the configuration of the system.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Let’s analyse the relevant part in the application.conf file.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

The first thing is to specify the path to the router actor, that is /node/processorRouter. Inside that property we can configure the behaviour of the router:

  • router: this is the policy for the load balancing of messages. I chose the round-robin-group, but there are many others.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?