本文介绍akka的基本使用方法,由于属于基础功能,想不出一个很高大上的名称,此处就以基础模式命名。下文会介绍actor的使用方法,及其优劣点。
class SimpleActor(name:String) extends Actor {
private def doWork(message:SayHello):Unit = {
println(s"$name 收到 ${message.from.path.name} 的消息 [$message] ,工作进行中... 当前线程号 ${Thread.currentThread().getId}")
}
override def receive: Receive = {
case msg @ SayHello(from,message) =>
doWork(msg)
val returnMsg = HelloSaid(s"嗨 ${from.path.name} ,${self.path.name} 收到了 $message 消息")
println(s"$name 工作结束,准备返回消息[${returnMsg.message}]")
}
}
object SimpleBasicPattern {
def main(args: Array[String]): Unit = {
val system = ActorSystem("BasicPattern",ConfigFactory.load())
val person1 = system.actorOf(Props(new SimpleActor("person1")),"personActor1")
println(s"Main thread Id ${Thread.currentThread().getId}")
person1 ! SayHello(person1,"Hello World 1")
person1 ! SayHello(person1,"Hello World 2")
}
}
输出:
Main thread Id 1
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 1)] ,工作进行中... 当前线程号 13
person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 1 消息]
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 2)] ,工作进行中... 当前线程号 13
person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 2 消息]
如上图,我设计了一个简单的actor:HelloWroldActor。它有两个方法,其中receive是收到消息之后处理消息的入口函数,定义了对消息的处理方式。收到SayHello之后,调用doWork同步处理消息
我们可以跟
上一篇博客进行对比,此处给person1发送了一条SayHello消息,在OOP中是直接调用函数,此处使用 ! 函数发送消息;person1收到消息后,同步调用doWork处理消息。这是最基本的actor使用方式:通过 ! 发消息给actor。从输出中可以看到,主线程和doWork所在线程是不同的线程。
这是基础模式的最基本形式,给actor发送消息,actor对消息进行响应,发送和响应是异步的,同一个actor对所有的消息都是按照邮箱队列的顺序,串行调用的。下面是基础模式的另外一种高级形式。
class HelloWorldActor(other:ActorRef,name:String) extends Actor {
private def doWork(message:String):HelloSaid = {
println(s"$name 收到 ${other.path.name} 的消息 [$message] ,工作进行中...")
HelloSaid("这是处理后返回的消息")
}
override def receive: Receive = {
case DoWork(message) =>
println(s"嗨 ${other.path.name} ,我正在为你工作")
val returnMsg = doWork(message)
other ! WorkDone(returnMsg.message)
case WorkDone(message) =>
println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]")
}
}
object BasicPattern {
def main(args: Array[String]): Unit = {
val system = ActorSystem("BasicPattern",ConfigFactory.load())
val person1 = system.actorOf(Props(new HelloWorldActor(null,"person1")),"personActor1")
val person2 = system.actorOf(Props(new HelloWorldActor(person1,"person2")),"personActor2")
person2 ! DoWork("Hello World")
}
}
输出:
嗨 personActor1 ,我正在为你工作
person2 收到 personActor1 的消息 [Hello World] ,工作进行中...
person1 收到了 personActor2 的回复消息:[嗨 personActor1 工作已完成,这是返回消息 HelloSaid(这是处理后返回的消息)]
在上面的模式中,我们首先给person2发送了开始工作的消息,person2收到消息后,开始为person1工作:调用doWork进行计算。计算结束后把消息发送给了person1,person1收到workDone的消息后,将结果打印了出来。这个例子稍微复杂点,涉及到了两个actor的通信。但这仍然是一种简单的形式,因为person1的actorRef引用是通过构造函数传递给person2的,这样person2就只能为person1工作。这非常不方便,因为actor创建的时候不一定能知道另外一个actor的地址。那么下面又是一种高级形式:
class HelloActor(name:String) extends Actor {
private def doWork(message:String,forActor:ActorRef):HelloSaid = {
println(s"$name 收到 ${forActor.path.name} 的消息 [$message] ,工作进行中...")
HelloSaid("这是处理后返回的消息")
}
override def receive: Receive = {
case DoWorkFor(message,forActor) =>
println(s"嗨 ${forActor.path.name} ,我正在为你工作")
val returnMsg = doWork(message,forActor)
forActor ! WorkDone(returnMsg.message)
case WorkDone(message) =>
println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]")
}
}
object BasicPattern3 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("BasicPattern2",ConfigFactory.load())
val person1 = system.actorOf(Props(new HelloActor("person1")),"personActor1")
val person2 = system.actorOf(Props(new HelloActor("person2")),"personActor2")
person2 ! DoWorkFor("Hello World",person1)
}
}
输出:
嗨 personActor1 ,我正在为你工作
person2 收到 personActor1 的消息 [Hello World] ,工作进行中...
person1 收到了 personActor2 的回复消息:[这是处理后返回的消息]
在上面的图中,我们把person1的actorRef通过消息的形式发送给了person2,这样person2就能为不同的person工作了,因为工作的对象是通过消息传递的。
通过上面3个例子,我们可以看到,只能通过给actor发送消息与actor通信,调用其对应的函数,函数的返回结果也只能异步的发送给调用方。而在OOP中调用另一个对象的函数,看起来比这个简单多了,获取函数处理结果也非常简单。但读者要仔细思考这两者的区别,actor的通信全都是异步的。意味着person2给person1发送消息之后,可以立即进行其他的处理,而不需要等待person1的应答,即person1和person2功能做到了完全解耦。
BasicPattern2和BasicPattern3的区别是调用方获取方式的不同,其实还有另外一种形式:
class Master(workerPath:String) extends Actor{
override def receive: Receive = {
case DoWork(message) =>
println(s"master 收到 doWork消息:$message")
val worker = context.actorSelection( s"/user/$workerPath")
worker ! DoWorkFor(message,self)
case WorkDone(message) =>
println(s"master 收到 ${sender().path.name} 的返回消息 $message")
}
}
class Worker extends Actor{
private def doWork(message:String):String = {
println(s"worker 收到了消息 $message")
"这里是worker返回消息"
}
override def receive: Receive = {
case DoWorkFor(message,forActor) =>
val result = doWork(message)
forActor ! WorkDone(result)
}
}
object BasicPattern4 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("BasicPattern2",ConfigFactory.load())
system.actorOf(Props(new Worker()),"workerActor")
val master = system.actorOf(Props(new Master("workerActor")),"masterWorker")
master ! DoWork("Hello World")
}
}
输出
master 收到 doWork消息:Hello World
worker 收到了消息 Hello World
master 收到 workerActor 的返回消息 这里是worker返回消息
在这个形式中,master通过worker的actorPath,用actorSelection查询了worker的地址,然后发送消息给它。与BasicPattern3不同的是,master不需要知道worker的邮箱地址,它只需要知道worker的actorPath就可以发消息了。然后master和worker就可以按照前面的pattern互通消息了。
请注意DoWork、WorkDone、DoWork这三个消息的处理完全是异步的,没有任何直接的关系。
上面的4个例子我都将其认定为基础模式,因为这都是Akka的基础功能,没有涉及太高深的技术,也是在学习Akka的初期最容易理解的模式。虽然简单,还是有很多值得学习的地方的。在下一篇博客中,我们会针对BasicPattern4进行优化,讲解另外MasterWorkerBackend模式,这种模式比较复杂,希望读者深刻理解本博文的4个例子,再阅读后续文章。