Objectives
Use this pattern to build a load balance Akka system.
Make each node in this pattern within the docker container, so that we can easily deploy new node.
Pattern overview
You can refer to this pattern in detail in this post .
(Or just look at this picture in that post )
But in general, this pattern has following features:
The system consists of one master and several worker s.
The master receives tasks and send them to worker s.
Workers work on and finish these tasks.
The master will track the status of worker**s, and only send task to *worker s that is not currently working. (By doing so, we can achieve load balance.)
After worker finishes its task, it will send message back to the master , telling master that it is not working now.
To achieve our objectives, we are going to:
Make master into a container.
Make a group of workers into another container.
After we deploy above containers, we start another group of workers in another container. (To confirm the hot scale-out)
Implementation
As the features described above, writing master and worker is pretty straight forward. You can find detailed source code about this pattern at the bottom of that post .
Here is the system that I simplified:
TaskWorker
package actors
import actors.TaskMaster._
import actors.TaskWorker.WorkFinished
import akka.actor. { Actor , ActorLogging , ActorPath , ActorRef }
object TaskWorker {
case class WorkFinished ( isFinished : Boolean = true )
}
abstract class TaskWorker ( masterPath : ActorPath ) extends Actor with ActorLogging {
val master = context . actorSelection ( masterPath )
override def preStart () = master ! NewWorker ( self )
// TODO: add postRestart function for better robustness
def working ( task : Any , masterRef : ActorRef ) : Receive = {
case WorkFinished ( isFinished ) =>
log . debug ( "Worker finished task." )
masterRef ! TaskResponse ( self , isFinished )
context . become ( idle )
}
def idle : Receive = {
case TaskIncoming =>
master ! TaskRequest ( self )
case TaskTicket ( task , taskOwner ) =>
log . debug ( "Worker get task." )
context . become ( working ( task , sender ()))
work ( taskOwner , task )
case TaskConsumedOut =>
log . debug ( "Worker did not get any task." )
}
def receive = idle
def finish () : Unit = {
self ! WorkFinished ( isFinished = true )
}
def fail () : Unit = {
self ! WorkFinished ( isFinished = false )
}
def work ( taskOwner : Option [ ActorRef ], task : Any ) : Unit
// define this method if you want to deal with the result of processing
// def submitWork(workResult: WorkerResult): Unit
}
TaskMaster
package actors
import actors.TaskMaster._
import akka.actor. { Actor , ActorLogging , ActorRef }
import scala.collection.mutable
// companion object for protocol
object TaskMaster {
case class NewWorker ( worker : ActorRef )
case class TaskList ( taskList : List [ Any ])
case class TaskIncoming ()
case class TaskResponse ( worker : ActorRef , isFinished : Boolean = true )
case class TaskRequest ( worker : ActorRef )
case class TaskConsumedOut ()
// Add original sender of task, so that worker can know who send this.
// In that case, worker can directly send result of the task to the original sender
// Change the type of `task` to apply more specified task, or you can define you own task.
case class TaskTicket ( task : Any , taskOwner : Option [ ActorRef ])
}
class TaskMaster extends Actor with ActorLogging {
val taskQueue = mutable . Queue . empty [ Object ]
val workers = mutable . Map . empty [ ActorRef , Option [ TaskTicket ]]
override def receive : Receive = {
// when new worker spawns
case NewWorker ( worker ) =>
context . watch ( worker )
workers += ( worker -> None )
notifyFreeWorker ()
// when worker send task result back
case TaskResponse ( worker , isFinished ) =>
if ( isFinished )
log . debug ( s "task is finished." )
else
log . debug ( s "task failed to finish." )
workers += ( worker -> None )
self ! TaskRequest ( worker )
// when worker want task
case TaskRequest ( worker ) =>
if ( workers . contains ( worker )) {
if ( taskQueue . isEmpty )
worker ! TaskConsumedOut ()
else {
if ( workers ( worker ) == None ) {
val task = taskQueue . dequeue ()
assignTask ( task , worker )
} else {
// this will never happen
log . error ( "Some worker is requiring task while processing the task." )
}
}
}
// when receive tasks
case TaskList ( taskList : List [ Object ]) =>
taskQueue . enqueue ( taskList : _ * )
notifyFreeWorker ()
}
def assignTask ( task : Object , worker : ActorRef ) = {
workers += ( worker -> Some ( TaskTicket ( task , Some ( self ))))
worker ! TaskTicket ( task , Some ( self ))
}
def notifyFreeWorker () = {
if ( taskQueue . nonEmpty )
workers . foreach {
case ( worker , m ) if m . isEmpty => worker ! TaskIncoming ()
case _ => log . error ( "Something wired in the worker map!" )
}
}
}
Start the system
Our system should have two entries: one for starting the master, and the other is starting the worker to be connected to the master.
import java.util.UUID
import java.util.concurrent.TimeUnit
import akka.actor. { ActorPath , ActorSystem , Props }
import akka.routing.RoundRobinRouter
import com.typesafe.config.ConfigFactory
import actors._
import scala.concurrent.duration.Duration
object StartNode {
val systemName = "load-balance"
val systemConfigKey = "load-balance"
val systemNodeConfigKey = "node"
val masterNodeConfigKey = "master"
val workerNodeConfigKey = "worker"
val workerNodePopulationConfigKey = "population"
val configHostKey = "host"
val configPortKey = "port"
val masterActorName = "master"
// pattern used to get actor location
val masterPathPattern = s "akka.tcp://$systemName@%s:%s/user/$masterActorName"
def main ( args : Array [ String ]) : Unit = {
if ( args . length == 1 ) {
args ( 0 ) match {
case "worker" =>
startAsWorkerGroup ()
case "master" =>
startAsMaster ()
case _ =>
println ( s "Can not parse start mode: ${args(0)}" )
}
} else {
println ( s "Please choose start mode." )
}
}
def startAsWorkerGroup () : Unit = {
println ( "Start actor system ..." )
val system = ActorSystem ( systemName , ConfigFactory . load . getConfig ( systemConfigKey ))
val workerNodeConfig = ConfigFactory . load . getConfig ( systemNodeConfigKey ). getConfig ( workerNodeConfigKey )
val masterNodeConfig = ConfigFactory . load . getConfig ( systemNodeConfigKey ). getConfig ( masterNodeConfigKey )
println ( "Parse worker config ..." )
// get worker config
val workerCounter = workerNodeConfig . getInt ( workerNodePopulationConfigKey )
println ( "Connect to master ..." )
// connect to master
val masterHost = masterNodeConfig . getString ( configHostKey )
val masterPort = masterNodeConfig . getInt ( configPortKey )
val masterLocation = masterPathPattern . format ( masterHost , masterPort )
println ( s "to location $masterLocation" )
println ( "Connect to agent ..." )
val masterOpt = try {
Option ( system . actorSelection ( masterLocation ))
} catch {
case x : Throwable =>
Option . empty
}
if ( masterOpt . isEmpty ) {
println ( "Can not connect to master node!" )
} else {
println ( "Worker Start!" )
val workerGroup = system . actorOf ( Props ( new DemoWorker (
ActorPath . fromString ( masterLocation )))
. withRouter ( RoundRobinRouter ( workerCounter )))
}
}
def startAsMaster () : Unit = {
println ( "Master Start!" )
println ( "Parse master config ..." )
println ( "Start actor system ..." )
val system = ActorSystem ( systemName , ConfigFactory . load . getConfig ( systemConfigKey ))
println ( "Spawn master ..." )
val master = system . actorOf ( Props ( new TaskMaster ()), name = masterActorName )
/* Test this system with a scheduled task sending tasks to master */
val scheduler = system . scheduler
val task = new Runnable {
def run () {
// create random task
val task = s "Task:${UUID.randomUUID().toString}"
master ! List ( task )
}
}
implicit val executor = system . dispatcher
scheduler . schedule (
initialDelay = Duration ( 2 , TimeUnit . SECONDS ),
interval = Duration ( 1 , TimeUnit . SECONDS ),
runnable = task )
}
}
Configuration
There are basically two kinds of things should be set in the config file.
The binding information of local actor system.
The remote actor system that is going to be connected.
Since the master can get worker ’s location when worker send message to ask for connection,
we can only set the location information of master .
A example of master actor:
load-balance-system { // system name
akka {
// settings of logging
loglevel = "DEBUG"
# change log handler
loggers = [ "akka.event.slf4j.Slf4jLogger" ]
scheduler {
tick-duration = 50 ms
ticks-per-wheel = 128
}
// actor type : remote
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = [ "akka.remote.netty.tcp" ]
log-sent-messages = on
log-received-messages = on
// hostname and port of current system
netty.tcp {
// if the config is used for worker , then change the hostname and port to worker's host and port
hostname = 127.0 . 0.1
port = 2550
}
}
}
}
node {
master {
id = "test"
activity-chunk-size = 300
host = "127.0.0.1"
port = 2550
}
worker {
host = "127.0.0.1"
port = 2552
population = 20
}
}
Integrate with Docker
Make native package
Add sbt-native
package to your project/plugins.sbt
file.
// SBT - Native Packager Plugin
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "0.8.0-M2")
Add following line to your build.sbt
file to specify the package type.
packageArchetype.java_server
Run following command to build your deploy package.
sbt clean stage universal:packageZipTarball
Dockerfile
FROM java:7-jre
MAINTAINER Chris Kong <chris.kong.cn@gmail.com>
RUN mkdir -p /home/kong
ENV USER_HOME /home/kong
# Copy your package to container
COPY target/universal/stage/bin $USER_HOME/bin
COPY target/universal/stage/lib $USER_HOME/lib
# Copy config file
COPY /mnt/application.conf $USER_HOME/conf/application.conf
# symlink log directory
RUN ln -sfn /mnt/logs $USER_HOME/logs
RUN ln -sfn /mnt/config $USER_HOME/conf
ENV LANG C.UTF-8
WORKDIR $USER_HOME
# container entrance
ENTRYPOINT [ "scala-akka-loadbalance-pattern-with-docker" ]
Build image
sbt clean stage universal : packageZipTarball
docker build - f Dockerfile - t load - balance - pattern
Run the container
There is only one thing to remember when start the container, that is using the host’s network interface by specifying --net=host
.
For master:
docker run --name=load-balance-master -d \
--net=host \
-v <your-log-folder>:/mnt/logs \
-v <your-master-config-file>:/mnt/config \
load-balance-pattern master
For worker:
docker run --name=load-balance-worker -d \
--net=host \
-v <your-log-folder>:/mnt/logs \
-v <your-worker-config-file>:/mnt/config \
load-balance-pattern worker
When above commands are executed, you will see the message indicating that workers has been connected to the master in the master’s log file.
And that is how the use docker and Akka to build a load balanced distributed system.
Please make sure your master and workers are in the same VPC network when deploy onto AWS’s EC2 servers.
Improvements
Currently, deployments need different config files, which is annoying. You can use --env-file
and -e
to forge a config file inside the container, so that role-based config is no longer needed. Only a config template will be needed.
It is ok that we add worker node to our master node on runtime. But when we try to disconnect worker node from master node, there will be something wrong. We can add a mechanism to detect dying worker and disconnect them.
You can find above source code at:
https://github.com/visualskyrim/scala-akka-loadbalance-pattern-with-docker