天天看點

Akka Actor模型的簡介與Actor的建立方式

Akka Actor其具有以下特點:

  • 系統中的所有事物都可以扮演一個Actor
  • Actor之間完全獨立
  • 在收到消息時,Actor所采取的所有動作都是并行的,在一個方法中的動作沒有明确的順序
  • Actor有辨別和目前行為描述
  • Actor可能被分成原始和非原始類别
  • 非原始Actor有一個由郵件位址表示的辨別
  • 目前行為由一組知識(執行個體變量或本地狀态)和定義Actor在收到消息時将采取的動作組成
  • 消息傳遞是非阻塞和異步的,其機制是郵件隊列
  • 所有消息發送都是并行的。

絕大多數流行語言的并發是基于多線程之間的共享記憶體,使用同步方法防止寫争奪,但Akka提供的則是基于Actor的并發模型。

Akka将複雜的Actor通信、Actor注冊、Actor查找進行了封裝。使用者在寫自己的Actor時,隻需要實作akka.actor.Actor這個接口。

在Akka架構中,每一個Actor都有一個唯一的URL,該URL的定義格式和網際網路位址的定義格式非常相似。

每一個Actor通過ActorSystem和Context初始化的時候,都會得到自己唯一的路徑,路徑格式如:akka.tcp://systemName@ip:port/user/topActorName/otherActorName,并且可以通過actorSelection(path)方法查找對應路徑的Actor對象,該方法傳回該Actor的引用。得到Actor後就可以發送消息了。

建立Actor

(1)通過實作akka.actor.Actor來建立Actor類

要定義自己的Actor類,需要繼承Actor并實作receive方法。receive方法需要定義一系列case語句,來描述Actor能夠處理哪些消息(使用标準的Scala模式比對),以及實作對消息如何進行處理的代碼。

import akka.actor._
object HelloScala {
  def main(args: Array[String]): Unit = {
    val _system = ActorSystem("HelloScalaNo7")
    // Props[MyActor]這裡一定要寫對
    val myActor = _system.actorOf(Props[MyActor],"iLoveMyActor")
    myActor ! "Hello My love"
  }
}
class MyActor extends Actor{
  override def receive: Receive = {
    case str: String => println(str)
    case _ =>
  }
}      

Akka Actor receive消息循環是“無窮無盡的”。在receive方法中,提供一個對它能夠接受消息的比對規則,如果希望處理未知的消息,可以像上述代碼中一樣提供一個預設的case分支,否則會有akka.actor.UnhandledMessage(message,sender,recipient)被釋出到Actor系統(ActorSystem)的事件(EventStream)中。

上述代碼中的actorOf的調用将傳回一個執行個體的引用,這個引用是Actor通路句柄,可以用它來與實際的Actor進行互動。

ActorRef是不可變量,與它所代表的Actor之間是一對一的關系。ActorRef是可序列化的,并且它攜帶了網絡資訊。這意味着可以将它序列化以後,通過網絡進行傳送,在遠端主機上它仍然代表原結點上的同一個Actor。

上述代碼中,Actor是從系統建立的。也可以在其他的Actor中,使用Actor上下文(context)來建立,其中的差別在于監管樹的組織方式。使用上下文時目前Actor将成為其建立子Actor的監管者。而使用系統建立的Actor将成為頂級Actor,它由系統(内部監管Actor)來監管。

(2)使用Actor的context建立Actor:

import akka.actor._
object HelloScala {

  def main(args: Array[String]): Unit = {
    val _system = ActorSystem("HelloScala")
    // firstActor是使用系統建立的Actor,它将成為頂級Actor,它由系統(内部監管Actor)來監管。
    val firstActor = _system.actorOf(Props[FirstActor],"firstActor")
    firstActor ! "Hello MyActor!How are you?"
  }
}


class MyActor extends Actor{
  override def receive: Receive = {
    case str: String => println(s"test actor receive # ${str}")
    case _ =>
  }
}

class FirstActor extends  Actor{
  // context 方法建立Actor,使用上下文時目前Actor将成為其建立子Actor的監管者,是以FirstActor成為MyActor的監管者
  val myActor = context.actorOf(Props[MyActor],"myActor")

  // 輸出監管目錄
  println(s"MyActor's monitor#${myActor.path.parent.getElements.toString}")

  // Actor啟動,preStart方法自動調用
  override def preStart(): Unit = println("FirstActor's preStart method was called!")

  override def receive: Receive = {
    case msg => myActor ! msg // 發給MyActor
    case _ =>
  }
}      

(3)使用非預設構造方法建立Actor

如果Actor的構造方法帶參數,那麼就不能使用actorOf(Props[TYPE])來建立。可以使用actorOf的非預設構造方法,這樣就可以用任意方式來建立Actor了。

import akka.actor._
object HelloScala {

  def main(args: Array[String]): Unit = {
    // 建立名為HelloAkka的ActorSystem
    val _system = ActorSystem("HelloAkka")
    // 非預設構造方法建立,在Props中new 一個Test,并傳入參數goat
    val testActor = _system.actorOf(Props(new Test("goat")),"testActor")
    testActor ! "Look at the stars!"
  }
}

class Test(name: String) extends Actor{
  // Actor啟動,自動調用preStart方法
  override def preStart(): Unit = println("Test Actor preStart method was called!")
  override def receive: Receive = {
    // 比對列印消息
    case str: String => println(s"actor's name:${name}#testActor receive msg:${str}")
    // 預設處理
    case _ =>
  }
}      
import akka.actor._
object HelloScala {

  def main(args: Array[String]): Unit = {
    // 建立名為HelloAkka的ActorSystem
    val _system = ActorSystem("HelloAkka")
    // 使用ActorSystem的actorOf工廠方法建立Test4 Actor
    val test4Actor = _system.actorOf(Props[Test4],"test4Actor")
    test4Actor ! "Look at the stars!HO HO HO "
  }
}

class Test4 extends Actor{
  // Actor啟動,自動調用preStart方法
  override def preStart(): Unit = println("Test4 Actor preStart method was called!")
  override def receive: Receive = {
    // 比對列印消息
    case str: String =>
      // 使用context的actorOf方法建立Actor
      context.actorOf(Props(new Actor {
        // 建立匿名Actor
        override def receive: Receive = {
          case msg: String =>
            println(s"anonymous Actor receive message#${msg}")
            context.stop(self) // 停止匿名Actor
        }
      })).forward(str)// forward會把消息轉發給剛剛定義的匿名Actor
    // 預設處理
    case _ =>
  }
}