天天看點

Akka使用入門

    • 一Akka簡單介紹
    • 二Akka簡單使用
      • 從建立一個scala項目說起
      • 第一個Akka應用
        • a 定義一個Actor
        • b 用戶端調用向actor發送消息
        • c Actor的生命周期
        • dActor程式設計模型的層次結構
      • akka的容錯機制
      • akka的遠端調用
        • 用戶端應用入口
        • 服務端入口
        • pojo類
        • 用戶端配置檔案
        • 服務端配置檔案
    • 三Spark20為什麼放棄Akka
    • 四Akka适用場景

一、Akka簡單介紹

Akka基于Actor模型,提供了一個用于建構可擴充的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平台。

Actor是Akka中最核心的概念,它是一個封裝了狀态和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。

通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正确地并發程式和并行系統,Actor具有如下特性:

+ 提供了一種進階抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的程式設計開發

+ 提供了異步非阻塞的、高性能的事件驅動程式設計模型

+ 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

二、Akka簡單使用

1. 從建立一個scala項目說起

  1. 在D盤建立一個項目目錄,比如說叫AkkaDemo。
  2. 進入AkkaDemo目錄,建立一個build.gradle檔案,并在檔案中輸入一下内容:
apply plugin: 'idea'
apply plugin: 'scala'

task "createDirs" << {
    sourceSets*.scala.srcDirs*.each { it.mkdirs() }
    sourceSets*.resources.srcDirs*.each { it.mkdirs() }
}

repositories{
    mavenCentral()
    mavenLocal()
}

dependencies{
    compile "org.scala-lang:scala-library:2.10.4"
    compile "org.scala-lang:scala-compiler:2.10.4"
    compile "org.scala-lang:scala-reflect:2.10.4"
    compile "com.typesafe.akka:akka-actor_2.11:2.4.4"
    testCompile "junit:junit:4.11"
}

task run(type: JavaExec, dependsOn: classes) {
    main = 'Main'
    classpath sourceSets.main.runtimeClasspath
    classpath configurations.runtime
}
           
  1. 執行

    gradle cDirs

    指令,建立起項目骨架。
  2. 使用Idea導入gradle項目。File”->”Import Project”選擇打開build.gradle即可。

2. 第一個Akka應用

使用Akka架構進行應用開發,基本上遵循以下步驟即可:

1. 編寫一個Actor類,繼承特質akka.actor.Actor,同時可以編入一些其他的特質,如ActorLogging,用于記錄日志。

2. 實作Actor的receive方法,receive方法中定義一系列的case語句,基于标準Scala的模式比對方法,來實作每一種消息的處理邏輯。

3. 編寫程式入口,在入口程式中,建立一個頂層的ActorSystem。

4. 建立一個actor,可以使用ActorSystem或context的ActorOf方法建立,也可以使用context.actorSelection方法通過actor的名稱從上下文中查找。

5. 向actor發送消息, ! 代表發送。

a. 定義一個Actor

  • 定義一個Actor類,繼承Actor,編入ActorLogging特質(相當于實作一個接口)
  • 重寫Actor類的Receive方法,用于接收消息。
  • 使用scala的模式比對功能,根據不同消息做不同邏輯處理
class HelloActor extends Actor with ActorLogging{
  override def receive: Receive = {
    case "hello" => log.info("hello => 你好!")
    case msg if "world"==msg => log.info(s"$msg => 世界")
    case "quit"|"stop" => log.info("我要停機了!");context stop(self)
  }
}
           

b. 用戶端調用,向actor發送消息

以下為程式入口,其中actor為HelloActor執行個體的一個引用,通過!向actor發送消息;通過terminate方法關閉HelloService。

※※注意!※※

- ActorRef 類型的對象是不可變的,并且是可序列化的,可以在網絡中進行傳輸,作為遠端對象使用,具體的操作還是在本地Actor中進行。

- ActorRef在建立時可以不指定名稱,即actorOf(Props[Class])。但是如果指定名稱的話,需要保證在父級actor下名稱是唯一的。

- actor名稱不能是以“$”開頭的字元串

- actor可以有帶參構造,但是如果使用帶參構造則不能使用Props[Class]建立,應使用Props(new HelloActor(“”,…))方式執行個體化。

- Actor的unhandled方法對 receive 方法中未比對成功的消息進行處理,預設情況有兩種處理方式:當未處理消息類型是 akka.actor.Terminated 時,抛出 akka.actor.DeathPactException;當其它未處理消息時,向akka.event.EventStream 發送 akka.actor.UnhandledMessage 類型消息

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
object HelloAkka extends App{
  val helloService = ActorSystem("HelloService")
  val actor = helloService.actorOf(Props[HelloActor],"hello")
  actor ! "hello"
  actor ! "world"
  actor ! "stop"
  helloService terminate
}
           

c. Actor的生命周期

  • PreStart 方法隻在第一次建立時被調用,來初始化 actor 實體。
  • PostStop 方法一定在收件箱停止後才運作,用于關閉資源。
  • 在重新開機的同時,收件箱不會被影響,可以繼續接收消息。
  • 消息發給已經停止的 actor 會被轉發到系統的 deadLetters Actor。
  • Actor 的構造函數在第一次建立和每次重新開機時被調用,來初始化 acto

d.Actor程式設計模型的層次結構

在Akka中,一個ActorSystem是一個重量級的結構,他需要配置設定多個線程,是以在實際應用中,按照邏輯劃分的每個應用對應一個ActorSystem執行個體。

ActorSystem的Top-Level層次結構,與Actor關聯起來,稱為Actor路徑(Actor Path),不同的路徑代表了不同的監督範圍(Supervision Scope)。下面是ActorSystem的監督範圍:

+ “/”路徑:通過根路徑可以搜尋到所有的Actor

+ “/user”路徑:使用者建立的Top-Level Actor在該路徑下面,通過調用ActorSystem.actorOf來實作Actor的建立

+ “/system”路徑:系統建立的Top-Level Actor在該路徑下面

+ “/deadLetters”路徑:消息被發送到已經終止,或者不存在的Actor,這些Actor都在該路徑下面

+ “/temp”路徑:被系統臨時建立的Actor在該路徑下面

+ “/remote”路徑:改路徑下存在的Actor,它們的Supervisor都是遠端Actor的引用

3. akka的容錯機制

一個ActorSystem是具有分層結構(Hierarchical Structure)的:一個Actor能夠管理(Oversee)某個特定的函數,他可能希望将一個task分解為更小的多個子task,這樣它就需要建立多個子Actor(Child Actors),并監督這些子Actor處理任務的進度等詳細情況,實際上這個Actor建立了一個Supervisor來監督管理子Actor執行拆分後的多個子task,如果一個子Actor執行子task失敗,那麼就要向Supervisor發送一個消息說明處理子task失敗。需要知道的是,一個Actor能且僅能有一個Supervisor,就是建立它的那個Actor。基于被監控任務的性質和失敗的性質,一個Supervisor可以選擇執行如下操作選擇:

+ 重新開始(Resume)一個子Actor,保持它内部的狀态

+ 重新開機一個子Actor,清除它内部的狀态

+ 終止一個子Actor

+ 擴大失敗的影響,進而使這個子Actor失敗

4. akka的遠端調用

下面是一個使用者到餐廳點餐的案例,服務員屬于服務端,通過加載customer.conf配置,将服務釋出在1000端口上。用戶端Actor通過服務端暴露服務連結,從上下文中選擇一個遠端waiter。然後在自身的receive方法中根據需要向服務端發送消息,同時在receive方法中接收waiter傳回的消息。

val waiterServiceUrl = "akka.tcp://[email protected]:1000/user/waiter"
  val waiter = context.actorSelection(path = waiterServiceUrl)
           

用戶端應用入口

package demo

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
  * 用戶端Actor,通過akka協定從上下文中查找服務端actor
  */
class Customer extends Actor with ActorLogging{
  val waiterServiceUrl = "akka.tcp://[email protected]:1000/user/waiter"
  val waiter = context.actorSelection(path = waiterServiceUrl)

  override def preStart(): Unit = log.info("customer is in!")
  override def postStop(): Unit = log.info("翔裡有毒,我挂了!")

  override def receive = {
    case DishesOrder(name) => log.info(s"我點的美味[$name]上來了,開吃...")
    case SoupOrder(name) => log.info(s"我點的鮮湯[$name]上來了,開喝...")
    case FoodOrder(name) => log.info(s"我點的主食[$name]上來了,開吃...")
    case DrinkOrder(name) => log.info(s"我點的飲料[$name]上來了,開喝...")
    case FruitOrder(name) => log.info(s"我點的水果[$name]上來了,吃不了了打包...")
    case menu:Menu => waiter ! menu
    case other => waiter ! other
  }
}

/**
  * 用戶端應用入口
  */
object CustomerService extends App{
  val customerService = ActorSystem("CustomerService",ConfigFactory.parseResources("customer.conf"))
  val customer = customerService.actorOf(Props[Customer],"customer")
  customerService.log.info("customer started!")
  customer ! Dishes(name=readLine("請輸入您要點的菜:")); Thread sleep L
  customer ! Soup(name=readLine("請輸入您要點的湯:")); Thread sleep L
  customer ! Food(name=readLine("請輸入您要點的主食:")); Thread sleep L
  customer ! Drink(name=readLine("請輸入您要點的飲料:")); Thread sleep L
  customer ! Fruit(name=readLine("請輸入您要點的水果:")); Thread sleep L
  customer ! readLine("您需要點别的嗎:")
  customerService.shutdown()
}
           

服務端入口

package demo

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import com.typesafe.config.ConfigFactory

class Waiter extends Actor with ActorLogging{
  override def preStart(): Unit = log.info("waiter is in!")
  override def postStop(): Unit = log.info("waiter is off!")
  override def receive = {
    case Dishes(name) => log.info(s"您點的[$name]菜品已下單."); sender ! DishesOrder(name)
    case Soup(name) => log.info(s"您點的[$name]湯已下單."); sender ! SoupOrder(name)
    case Food(name) => log.info(s"您點的主食[$name]已下單."); sender ! FoodOrder(name)
    case Drink(name) => log.info(s"您點的飲料[$name]已下單."); sender ! DrinkOrder(name)
    case Fruit(name) => log.info(s"您點的水果[$name]已下單."); sender ! FruitOrder(name)
    case other => log.info("您點的這個真沒有,免費送你一盆翔。。。"); sender ! PoisonPill
  }
}

/**
  * 服務端
  */
object WaiterService extends App{
  val waiterService = ActorSystem("WaiterService",ConfigFactory.parseResources("waiter.conf"))
  val waiter = waiterService.actorOf(Props[Waiter],"waiter")
  waiterService.log.info("waiterService started!")
}
           

pojo類

package demo
trait Menu
final case class Dishes(name:String) extends Menu
final case class Soup(name:String) extends Menu
final case class Food(name:String) extends Menu
final case class Drink(name:String) extends Menu
final case class Fruit(name:String) extends Menu
trait Order
final case class DishesOrder(name:String) extends Order
final case class SoupOrder(name:String) extends Order
final case class FoodOrder(name:String) extends Order
final case class DrinkOrder(name:String) extends Order
final case class FruitOrder(name:String) extends Order
           

用戶端配置檔案

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 
    }
  }
}
           

服務端配置檔案

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 
    }
  }
}
           

三、Spark2.0為什麼放棄Akka

  1. 很多Spark使用者也使用Akka,但是由于Akka不同版本之間無法互相通信,這就要求使用者必須使用跟Spark完全一樣的Akka版本,導緻使用者無法更新Akka。
  2. Spark的Akka配置是針對Spark自身來調優的,可能跟使用者自己代碼中的Akka配置沖突。
  3. Spark用的Akka特性很少,這部分特性很容易自己實作。同時,這部分代碼量相比Akka來說少很多,debug比較容易。如果遇到什麼bug,也可以自己馬上fix,不需要等Akka上遊釋出新版本。而且,Spark更新Akka本身又因為第一點會強制要求使用者更新他們使用的Akka,對于某些使用者來說是不現實的。

四、Akka适用場景

Akka适用場景非常廣泛,這裡根據一些已有的使用案例來總結一下,Akka能夠在哪些應用場景下投入生産環境:

  • 事務處理(Transaction Processing)

    線上遊戲系統、金融/銀行系統、交易系統、投注系統、社交媒體系統、電信服務系統。

  • 後端服務(Service Backend)

    任何行業的任何類型的應用都可以使用,比如提供REST、SOAP等風格的服務,類似于一個服務總線,Akka支援縱向&橫向擴充,以及容錯/高可用(HA)的特性。

  • 并行計算(Concurrency/Parallelism)

    任何具有并發/并行計算需求的行業,基于JVM的應用都可以使用,如使用程式設計語言Scala、Java、Groovy、JRuby開發。

  • 仿真

    Master/Slave架構風格的計算系統、計算網格系統、MapReduce系統。

  • 通信Hub(Communications Hub)

    電信系統、Web媒體系統、手機媒體系統。

  • 複雜事件流處理(Complex Event Stream Processing)

    Akka本身提供的Actor就适合處理基于事件驅動的應用,是以可以更加容易處理具有複雜事件流的應用。