Akka in 100 lines - Actors and Message Passing
Here is a quick program in Akka (2.2 in case it matters). The program is kept short at 100 lines to make it easier to understand what is going on which is also why there are many println statements and a few unused variables. It's a starting point for getting something bigger running!
Akka uses the actor model and message passing to run a distributed processing system. This model has been a standard approach in high performance computing for decades (see MPI for examples that started in the 1980s or the similar, although heavier, idea of fork() from unix in the 1970s). Akka brings an excellent model of this to Java in a more native fashion (there are MPI-Java bindings). Message passing is a different approach to concurrent processing than multi-threading as threads have things such as shared memory and deadlocks for those shared objects and less easier distributed computing (multi-threading is often within one jvm) where message passing are independent workers with independent memory (fork is somewhat different as it cloned the parent's memory as well which workers don't usually do).
This program starts 12 workers and collects their results for final display. You may get messages about dead letters (and the number may vary between runs) - see the Akka docs on this (snapshot; v2.2.1 is here) to understand, but the code is still working fine in this case.
Remember to add a few jars to your library/classpath: scala-library.jar, akka-actor_2.10-2.2.0.jar, and config-1.0.2.jar which are all taken from the akka-2.2.0 distribution in /lib.
For other, quick introductions, look here: http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html and http://java.dzone.com/articles/your-first-message-discovering
Details on some of the specifics around actors can be found: http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html
A more detailed introduction to actors is here: http://www.javaworld.com/article/2078775/scripting-jvm-languages/open-source-java-projects-akka.html
A simple example for load testing: http://www.javacodegeeks.com/2012/05/processing-10-million-messages-with.html
Akka uses the actor model and message passing to run a distributed processing system. This model has been a standard approach in high performance computing for decades (see MPI for examples that started in the 1980s or the similar, although heavier, idea of fork() from unix in the 1970s). Akka brings an excellent model of this to Java in a more native fashion (there are MPI-Java bindings). Message passing is a different approach to concurrent processing than multi-threading as threads have things such as shared memory and deadlocks for those shared objects and less easier distributed computing (multi-threading is often within one jvm) where message passing are independent workers with independent memory (fork is somewhat different as it cloned the parent's memory as well which workers don't usually do).
This program starts 12 workers and collects their results for final display. You may get messages about dead letters (and the number may vary between runs) - see the Akka docs on this (snapshot; v2.2.1 is here) to understand, but the code is still working fine in this case.
Remember to add a few jars to your library/classpath: scala-library.jar, akka-actor_2.10-2.2.0.jar, and config-1.0.2.jar which are all taken from the akka-2.2.0 distribution in /lib.
package tutorial;
import akka.actor.*;
import akka.japi.*;
import akka.routing.RoundRobinRouter;
public class Example {
public static void main(String args[]) {
System.out.println("Starting main ...");
Example xam = new Example();
xam.callActorsToCalculate();
// Main will finish before all of the threads finish
System.out.println("Finishing main ..."); //note when this is displayed in output
}
public void callActorsToCalculate() {
// create the basic Akka system:
ActorSystem xamSys = ActorSystem.create("example-system"); // no "_" allowed
// set up actor that will control the worker actors via messages; props first, then actor
Props pMaster = Props.create(MyMasterActor.class, "master_hello", 12);
final ActorRef masterRef = xamSys.actorOf(pMaster);
System.out.println("...(properties for master) and now MasterActor ready");
// start the master off with empty msg; should use ActorRef.noSender() leaving as is for clarity
masterRef.tell(new StartCalculationMessage(), masterRef.noSender());
System.out.println("just sent tell 'StartCalculationMessage' to Master...");
}
public static class MyMasterActor extends UntypedActor {
int numberOfWorkers;
int numberOfResults = 0;
int resultSum = 0;
private final ActorRef workActionsRouter;
public MyMasterActor(String name, int numOfSubWorkers) { // normal constructor
numberOfWorkers = numOfSubWorkers;
System.out.println("constructing "+ numberOfWorkers + " workers with str:"+name);
// create the router to send 'call to action' msg to workers
workActionsRouter = this.getContext()
.actorOf(new Props(MyActorWorker.class).withRouter(new RoundRobinRouter(
numberOfWorkers)), "workerRouter"); //use Props.create as above
System.out.println("workerRouter ready");
}
@Override
public void onReceive(Object message) throws Exception { // required method
if (message instanceof StartCalculationMessage) {
// the starting message has been received - kick off the actors
// to do the work
System.out.println("starting workers");
for (int numWork = 0; numWork < numberOfWorkers; numWork++) {
workActionsRouter.tell(new WorkStartMsg(), getSelf());
}
} else if (message instanceof WorkerResult) {
WorkerResult workerResult = (WorkerResult) message;
System.out.println("master received message back from worker");
numberOfResults++;
resultSum += workerResult.result*numberOfResults;
if (numberOfResults == numberOfWorkers) { // have all results come back?
System.out.println("A message from MasterActor:"
+ (numberOfResults + 100)+" sum="+resultSum);//could have called a FinalActor with Final msg
getContext().stop(getSelf()); //stopping the master (and thus its children)
getContext().system().shutdown(); // stop the system - seems like it
//should be in the method that started it, but that method has exited already
}
} else {
System.out.println("Unhandled message in master");
unhandled(message);
}
}
}
static class StartCalculationMessage {
} // empty class to use for messaging the start of work
static class WorkStartMsg {
} // message sent to workers, in this case to start off the work
static class WorkerResult { // data class to store the result info
private final int result;
WorkerResult(int value) { result = value; }
public int getResult() { return result; }
}
static class MyActorWorker extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof WorkStartMsg) {
WorkStartMsg workMsg = (WorkStartMsg) message; //unused, but could be to set starting point, e.g.
int result = 10; // set result to some value
getSender().tell(new WorkerResult(result), getSelf());
System.out.println("worker:start msg:"+this.toString()+"; & msg'd master");
} else {
System.out.println("Unhandled message in worker");
unhandled(message);
}
}
}
}
For other, quick introductions, look here: http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html and http://java.dzone.com/articles/your-first-message-discovering
Details on some of the specifics around actors can be found: http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html
A more detailed introduction to actors is here: http://www.javaworld.com/article/2078775/scripting-jvm-languages/open-source-java-projects-akka.html
A simple example for load testing: http://www.javacodegeeks.com/2012/05/processing-10-million-messages-with.html
Comments
Post a Comment