天天看點

akka設計模式系列-基礎模式

  本文介紹akka的基本使用方法,由于屬于基礎功能,想不出一個很高大上的名稱,此處就以基礎模式命名。下文會介紹actor的使用方法,及其優劣點。

class SimpleActor(name:String) extends Actor {
  private def doWork(message:SayHello):Unit = {
    println(s"$name 收到 ${message.from.path.name} 的消息 [$message] ,工作進行中... 目前線程号 ${Thread.currentThread().getId}")
  }
  override def receive: Receive = {
    case msg @ SayHello(from,message) =>
      doWork(msg)
      val returnMsg = HelloSaid(s"嗨 ${from.path.name} ,${self.path.name} 收到了 $message 消息")
      println(s"$name 工作結束,準備傳回消息[${returnMsg.message}]")
  }
}
object SimpleBasicPattern {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern",ConfigFactory.load())
    val person1 = system.actorOf(Props(new SimpleActor("person1")),"personActor1")
    println(s"Main thread Id ${Thread.currentThread().getId}")
    person1 ! SayHello(person1,"Hello World 1")
    person1 ! SayHello(person1,"Hello World 2")
  }
}
      
輸出:

Main thread Id 1
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 1)] ,工作進行中... 目前線程号 13
person1 工作結束,準備傳回消息[嗨 personActor1 ,personActor1 收到了 Hello World 1 消息]
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 2)] ,工作進行中... 目前線程号 13
person1 工作結束,準備傳回消息[嗨 personActor1 ,personActor1 收到了 Hello World 2 消息]
      

   如上圖,我設計了一個簡單的actor:HelloWroldActor。它有兩個方法,其中receive是收到消息之後處理消息的入口函數,定義了對消息的處理方式。收到SayHello之後,調用doWork同步處理消息

  我們可以跟

上一篇部落格

進行對比,此處給person1發送了一條SayHello消息,在OOP中是直接調用函數,此處使用 ! 函數發送消息;person1收到消息後,同步調用doWork處理消息。這是最基本的actor使用方式:通過 ! 發消息給actor。從輸出中可以看到,主線程和doWork所線上程是不同的線程。

  這是基礎模式的最基本形式,給actor發送消息,actor對消息進行響應,發送和響應是異步的,同一個actor對所有的消息都是按照郵箱隊列的順序,串行調用的。下面是基礎模式的另外一種進階形式。

class HelloWorldActor(other:ActorRef,name:String) extends Actor {
  private def doWork(message:String):HelloSaid = {
    println(s"$name 收到 ${other.path.name} 的消息 [$message] ,工作進行中...")
    HelloSaid("這是處理後傳回的消息")
  }
  override def receive: Receive = {
    case DoWork(message) =>
      println(s"嗨 ${other.path.name} ,我正在為你工作")
      val returnMsg = doWork(message)
      other ! WorkDone(returnMsg.message)
    case WorkDone(message) =>
      println(s"$name 收到了 ${sender().path.name} 的回複消息:[$message]")
  }
}
object BasicPattern {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern",ConfigFactory.load())
    val person1 = system.actorOf(Props(new HelloWorldActor(null,"person1")),"personActor1")
    val person2 = system.actorOf(Props(new HelloWorldActor(person1,"person2")),"personActor2")
    person2 ! DoWork("Hello World")
  }
}
      
輸出:

嗨 personActor1 ,我正在為你工作
person2 收到 personActor1 的消息 [Hello World] ,工作進行中...
person1 收到了 personActor2 的回複消息:[嗨 personActor1 工作已完成,這是傳回消息 HelloSaid(這是處理後傳回的消息)]
      

   在上面的模式中,我們首先給person2發送了開始工作的消息,person2收到消息後,開始為person1工作:調用doWork進行計算。計算結束後把消息發送給了person1,person1收到workDone的消息後,将結果列印了出來。這個例子稍微複雜點,涉及到了兩個actor的通信。但這仍然是一種簡單的形式,因為person1的actorRef引用是通過構造函數傳遞給person2的,這樣person2就隻能為person1工作。這非常不友善,因為actor建立的時候不一定能知道另外一個actor的位址。那麼下面又是一種進階形式:

class HelloActor(name:String) extends Actor {
  private def doWork(message:String,forActor:ActorRef):HelloSaid = {
    println(s"$name 收到 ${forActor.path.name} 的消息 [$message] ,工作進行中...")
    HelloSaid("這是處理後傳回的消息")
  }
  override def receive: Receive = {
    case DoWorkFor(message,forActor) =>
      println(s"嗨 ${forActor.path.name} ,我正在為你工作")
      val returnMsg = doWork(message,forActor)
      forActor ! WorkDone(returnMsg.message)
    case WorkDone(message) =>
      println(s"$name 收到了 ${sender().path.name} 的回複消息:[$message]")
  }
}
object BasicPattern3 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern2",ConfigFactory.load())
    val person1 = system.actorOf(Props(new HelloActor("person1")),"personActor1")
    val person2 = system.actorOf(Props(new HelloActor("person2")),"personActor2")
    person2 ! DoWorkFor("Hello World",person1)
  }
}
      
輸出:
嗨 personActor1 ,我正在為你工作
person2 收到 personActor1 的消息 [Hello World] ,工作進行中...
person1 收到了 personActor2 的回複消息:[這是處理後傳回的消息]
      

   在上面的圖中,我們把person1的actorRef通過消息的形式發送給了person2,這樣person2就能為不同的person工作了,因為工作的對象是通過消息傳遞的。

  通過上面3個例子,我們可以看到,隻能通過給actor發送消息與actor通信,調用其對應的函數,函數的傳回結果也隻能異步的發送給調用方。而在OOP中調用另一個對象的函數,看起來比這個簡單多了,擷取函數處理結果也非常簡單。但讀者要仔細思考這兩者的差別,actor的通信全都是異步的。意味着person2給person1發送消息之後,可以立即進行其他的處理,而不需要等待person1的應答,即person1和person2功能做到了完全解耦。

  BasicPattern2和BasicPattern3的差別是調用方擷取方式的不同,其實還有另外一種形式:

class Master(workerPath:String) extends Actor{
  override def receive: Receive = {
    case DoWork(message) =>
      println(s"master 收到 doWork消息:$message")
      val worker = context.actorSelection( s"/user/$workerPath")
      worker ! DoWorkFor(message,self)
    case WorkDone(message) =>
      println(s"master 收到 ${sender().path.name} 的傳回消息 $message")
  }
}
class Worker extends Actor{
  private def doWork(message:String):String = {
    println(s"worker 收到了消息 $message")
    "這裡是worker傳回消息"
  }
  override def receive: Receive = {
    case DoWorkFor(message,forActor) =>
      val result = doWork(message)
      forActor ! WorkDone(result)

  }
}
object BasicPattern4 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern2",ConfigFactory.load())
    system.actorOf(Props(new Worker()),"workerActor")
    val master = system.actorOf(Props(new Master("workerActor")),"masterWorker")
    master ! DoWork("Hello World")
  }
}
      
輸出

master 收到 doWork消息:Hello World
worker 收到了消息 Hello World
master 收到 workerActor 的傳回消息 這裡是worker傳回消息
      

   在這個形式中,master通過worker的actorPath,用actorSelection查詢了worker的位址,然後發送消息給它。與BasicPattern3不同的是,master不需要知道worker的郵箱位址,它隻需要知道worker的actorPath就可以發消息了。然後master和worker就可以按照前面的pattern互通消息了。

  請注意DoWork、WorkDone、DoWork這三個消息的處理完全是異步的,沒有任何直接的關系。

  上面的4個例子我都将其認定為基礎模式,因為這都是Akka的基礎功能,沒有涉及太高深的技術,也是在學習Akka的初期最容易了解的模式。雖然簡單,還是有很多值得學習的地方的。在下一篇部落格中,我們會針對BasicPattern4進行優化,講解另外MasterWorkerBackend模式,這種模式比較複雜,希望讀者深刻了解本博文的4個例子,再閱讀後續文章。

繼續閱讀