-
- 一Akka簡單介紹
- 二Akka簡單使用
- 從建立一個scala項目說起
- 第一個Akka應用
- a 定義一個Actor
- b 用戶端調用向actor發送消息
- c Actor的生命周期
- dActor程式設計模型的層次結構
- akka的容錯機制
- akka的遠端調用
- 用戶端應用入口
- 服務端入口
- pojo類
- 用戶端配置檔案
- 服務端配置檔案
- 三Spark20為什麼放棄Akka
- 四Akka适用場景
一、Akka簡單介紹
Akka基于Actor模型,提供了一個用于建構可擴充的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平台。
Actor是Akka中最核心的概念,它是一個封裝了狀态和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。
通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正确地并發程式和并行系統,Actor具有如下特性:
+ 提供了一種進階抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的程式設計開發
+ 提供了異步非阻塞的、高性能的事件驅動程式設計模型
+ 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)
二、Akka簡單使用
1. 從建立一個scala項目說起
- 在D盤建立一個項目目錄,比如說叫AkkaDemo。
- 進入AkkaDemo目錄,建立一個build.gradle檔案,并在檔案中輸入一下内容:
apply plugin: 'idea'
apply plugin: 'scala'
task "createDirs" << {
sourceSets*.scala.srcDirs*.each { it.mkdirs() }
sourceSets*.resources.srcDirs*.each { it.mkdirs() }
}
repositories{
mavenCentral()
mavenLocal()
}
dependencies{
compile "org.scala-lang:scala-library:2.10.4"
compile "org.scala-lang:scala-compiler:2.10.4"
compile "org.scala-lang:scala-reflect:2.10.4"
compile "com.typesafe.akka:akka-actor_2.11:2.4.4"
testCompile "junit:junit:4.11"
}
task run(type: JavaExec, dependsOn: classes) {
main = 'Main'
classpath sourceSets.main.runtimeClasspath
classpath configurations.runtime
}
- 執行
指令,建立起項目骨架。gradle cDirs
- 使用Idea導入gradle項目。File”->”Import Project”選擇打開build.gradle即可。
2. 第一個Akka應用
使用Akka架構進行應用開發,基本上遵循以下步驟即可:
1. 編寫一個Actor類,繼承特質akka.actor.Actor,同時可以編入一些其他的特質,如ActorLogging,用于記錄日志。
2. 實作Actor的receive方法,receive方法中定義一系列的case語句,基于标準Scala的模式比對方法,來實作每一種消息的處理邏輯。
3. 編寫程式入口,在入口程式中,建立一個頂層的ActorSystem。
4. 建立一個actor,可以使用ActorSystem或context的ActorOf方法建立,也可以使用context.actorSelection方法通過actor的名稱從上下文中查找。
5. 向actor發送消息, ! 代表發送。
a. 定義一個Actor
- 定義一個Actor類,繼承Actor,編入ActorLogging特質(相當于實作一個接口)
- 重寫Actor類的Receive方法,用于接收消息。
- 使用scala的模式比對功能,根據不同消息做不同邏輯處理
class HelloActor extends Actor with ActorLogging{
override def receive: Receive = {
case "hello" => log.info("hello => 你好!")
case msg if "world"==msg => log.info(s"$msg => 世界")
case "quit"|"stop" => log.info("我要停機了!");context stop(self)
}
}
b. 用戶端調用,向actor發送消息
以下為程式入口,其中actor為HelloActor執行個體的一個引用,通過!向actor發送消息;通過terminate方法關閉HelloService。
※※注意!※※
- ActorRef 類型的對象是不可變的,并且是可序列化的,可以在網絡中進行傳輸,作為遠端對象使用,具體的操作還是在本地Actor中進行。
- ActorRef在建立時可以不指定名稱,即actorOf(Props[Class])。但是如果指定名稱的話,需要保證在父級actor下名稱是唯一的。
- actor名稱不能是以“$”開頭的字元串
- actor可以有帶參構造,但是如果使用帶參構造則不能使用Props[Class]建立,應使用Props(new HelloActor(“”,…))方式執行個體化。
- Actor的unhandled方法對 receive 方法中未比對成功的消息進行處理,預設情況有兩種處理方式:當未處理消息類型是 akka.actor.Terminated 時,抛出 akka.actor.DeathPactException;當其它未處理消息時,向akka.event.EventStream 發送 akka.actor.UnhandledMessage 類型消息
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
object HelloAkka extends App{
val helloService = ActorSystem("HelloService")
val actor = helloService.actorOf(Props[HelloActor],"hello")
actor ! "hello"
actor ! "world"
actor ! "stop"
helloService terminate
}
c. Actor的生命周期
- PreStart 方法隻在第一次建立時被調用,來初始化 actor 實體。
- PostStop 方法一定在收件箱停止後才運作,用于關閉資源。
- 在重新開機的同時,收件箱不會被影響,可以繼續接收消息。
- 消息發給已經停止的 actor 會被轉發到系統的 deadLetters Actor。
- Actor 的構造函數在第一次建立和每次重新開機時被調用,來初始化 acto
d.Actor程式設計模型的層次結構
在Akka中,一個ActorSystem是一個重量級的結構,他需要配置設定多個線程,是以在實際應用中,按照邏輯劃分的每個應用對應一個ActorSystem執行個體。
ActorSystem的Top-Level層次結構,與Actor關聯起來,稱為Actor路徑(Actor Path),不同的路徑代表了不同的監督範圍(Supervision Scope)。下面是ActorSystem的監督範圍:
+ “/”路徑:通過根路徑可以搜尋到所有的Actor
+ “/user”路徑:使用者建立的Top-Level Actor在該路徑下面,通過調用ActorSystem.actorOf來實作Actor的建立
+ “/system”路徑:系統建立的Top-Level Actor在該路徑下面
+ “/deadLetters”路徑:消息被發送到已經終止,或者不存在的Actor,這些Actor都在該路徑下面
+ “/temp”路徑:被系統臨時建立的Actor在該路徑下面
+ “/remote”路徑:改路徑下存在的Actor,它們的Supervisor都是遠端Actor的引用
3. akka的容錯機制
一個ActorSystem是具有分層結構(Hierarchical Structure)的:一個Actor能夠管理(Oversee)某個特定的函數,他可能希望将一個task分解為更小的多個子task,這樣它就需要建立多個子Actor(Child Actors),并監督這些子Actor處理任務的進度等詳細情況,實際上這個Actor建立了一個Supervisor來監督管理子Actor執行拆分後的多個子task,如果一個子Actor執行子task失敗,那麼就要向Supervisor發送一個消息說明處理子task失敗。需要知道的是,一個Actor能且僅能有一個Supervisor,就是建立它的那個Actor。基于被監控任務的性質和失敗的性質,一個Supervisor可以選擇執行如下操作選擇:
+ 重新開始(Resume)一個子Actor,保持它内部的狀态
+ 重新開機一個子Actor,清除它内部的狀态
+ 終止一個子Actor
+ 擴大失敗的影響,進而使這個子Actor失敗
4. akka的遠端調用
下面是一個使用者到餐廳點餐的案例,服務員屬于服務端,通過加載customer.conf配置,将服務釋出在1000端口上。用戶端Actor通過服務端暴露服務連結,從上下文中選擇一個遠端waiter。然後在自身的receive方法中根據需要向服務端發送消息,同時在receive方法中接收waiter傳回的消息。
val waiterServiceUrl = "akka.tcp://[email protected]:1000/user/waiter"
val waiter = context.actorSelection(path = waiterServiceUrl)
用戶端應用入口
package demo
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* 用戶端Actor,通過akka協定從上下文中查找服務端actor
*/
class Customer extends Actor with ActorLogging{
val waiterServiceUrl = "akka.tcp://[email protected]:1000/user/waiter"
val waiter = context.actorSelection(path = waiterServiceUrl)
override def preStart(): Unit = log.info("customer is in!")
override def postStop(): Unit = log.info("翔裡有毒,我挂了!")
override def receive = {
case DishesOrder(name) => log.info(s"我點的美味[$name]上來了,開吃...")
case SoupOrder(name) => log.info(s"我點的鮮湯[$name]上來了,開喝...")
case FoodOrder(name) => log.info(s"我點的主食[$name]上來了,開吃...")
case DrinkOrder(name) => log.info(s"我點的飲料[$name]上來了,開喝...")
case FruitOrder(name) => log.info(s"我點的水果[$name]上來了,吃不了了打包...")
case menu:Menu => waiter ! menu
case other => waiter ! other
}
}
/**
* 用戶端應用入口
*/
object CustomerService extends App{
val customerService = ActorSystem("CustomerService",ConfigFactory.parseResources("customer.conf"))
val customer = customerService.actorOf(Props[Customer],"customer")
customerService.log.info("customer started!")
customer ! Dishes(name=readLine("請輸入您要點的菜:")); Thread sleep L
customer ! Soup(name=readLine("請輸入您要點的湯:")); Thread sleep L
customer ! Food(name=readLine("請輸入您要點的主食:")); Thread sleep L
customer ! Drink(name=readLine("請輸入您要點的飲料:")); Thread sleep L
customer ! Fruit(name=readLine("請輸入您要點的水果:")); Thread sleep L
customer ! readLine("您需要點别的嗎:")
customerService.shutdown()
}
服務端入口
package demo
import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import com.typesafe.config.ConfigFactory
class Waiter extends Actor with ActorLogging{
override def preStart(): Unit = log.info("waiter is in!")
override def postStop(): Unit = log.info("waiter is off!")
override def receive = {
case Dishes(name) => log.info(s"您點的[$name]菜品已下單."); sender ! DishesOrder(name)
case Soup(name) => log.info(s"您點的[$name]湯已下單."); sender ! SoupOrder(name)
case Food(name) => log.info(s"您點的主食[$name]已下單."); sender ! FoodOrder(name)
case Drink(name) => log.info(s"您點的飲料[$name]已下單."); sender ! DrinkOrder(name)
case Fruit(name) => log.info(s"您點的水果[$name]已下單."); sender ! FruitOrder(name)
case other => log.info("您點的這個真沒有,免費送你一盆翔。。。"); sender ! PoisonPill
}
}
/**
* 服務端
*/
object WaiterService extends App{
val waiterService = ActorSystem("WaiterService",ConfigFactory.parseResources("waiter.conf"))
val waiter = waiterService.actorOf(Props[Waiter],"waiter")
waiterService.log.info("waiterService started!")
}
pojo類
package demo
trait Menu
final case class Dishes(name:String) extends Menu
final case class Soup(name:String) extends Menu
final case class Food(name:String) extends Menu
final case class Drink(name:String) extends Menu
final case class Fruit(name:String) extends Menu
trait Order
final case class DishesOrder(name:String) extends Order
final case class SoupOrder(name:String) extends Order
final case class FoodOrder(name:String) extends Order
final case class DrinkOrder(name:String) extends Order
final case class FruitOrder(name:String) extends Order
用戶端配置檔案
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port =
}
}
}
服務端配置檔案
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port =
}
}
}
三、Spark2.0為什麼放棄Akka
- 很多Spark使用者也使用Akka,但是由于Akka不同版本之間無法互相通信,這就要求使用者必須使用跟Spark完全一樣的Akka版本,導緻使用者無法更新Akka。
- Spark的Akka配置是針對Spark自身來調優的,可能跟使用者自己代碼中的Akka配置沖突。
- Spark用的Akka特性很少,這部分特性很容易自己實作。同時,這部分代碼量相比Akka來說少很多,debug比較容易。如果遇到什麼bug,也可以自己馬上fix,不需要等Akka上遊釋出新版本。而且,Spark更新Akka本身又因為第一點會強制要求使用者更新他們使用的Akka,對于某些使用者來說是不現實的。
四、Akka适用場景
Akka适用場景非常廣泛,這裡根據一些已有的使用案例來總結一下,Akka能夠在哪些應用場景下投入生産環境:
-
事務處理(Transaction Processing)
線上遊戲系統、金融/銀行系統、交易系統、投注系統、社交媒體系統、電信服務系統。
-
後端服務(Service Backend)
任何行業的任何類型的應用都可以使用,比如提供REST、SOAP等風格的服務,類似于一個服務總線,Akka支援縱向&橫向擴充,以及容錯/高可用(HA)的特性。
-
并行計算(Concurrency/Parallelism)
任何具有并發/并行計算需求的行業,基于JVM的應用都可以使用,如使用程式設計語言Scala、Java、Groovy、JRuby開發。
-
仿真
Master/Slave架構風格的計算系統、計算網格系統、MapReduce系統。
-
通信Hub(Communications Hub)
電信系統、Web媒體系統、手機媒體系統。
-
複雜事件流處理(Complex Event Stream Processing)
Akka本身提供的Actor就适合處理基于事件驅動的應用,是以可以更加容易處理具有複雜事件流的應用。