1. 前言
Schedulerx2.0是阿裡中間件自研的基于akka架構的新一代分布式任務排程平台,提供定時、任務編排、分布式跑批等功能,具有高可靠、海量任務、秒級排程等能力。
本篇文章以Schedulerx2.0為例子,介紹akka的應用場景,希望能給同樣從事分布式系統開發的同學一些啟發。這裡不詳細介紹akka,初學者可以直接閱讀官方文檔(
https://doc.akka.io/docs/akka/current/index.html?language=java)。
2. Reactive
說到近幾年火熱的反應式程式設計,誰都能說幾句“異步、并發、非阻塞、高性能”等等,說到有代表性的項目,大家都知道RxJava、Akka、Reactor。
Why Reactive?
——因為Schedulerx2.0作為任務排程平台,支援海量任務排程,提供任務狀态機感覺任務狀态變化,需要Reactive的特性。
Why Akka?
——首先akka很簡單,每個actor隻需要實作一個onReceive方法。其次,Akka真的非常強大!我們可以看下官方文檔(
),Akka幾乎提供了一整套解決方案,使用akka可以很友善的實作一套高可靠、高并發、高性能的分布式系統。Schedulerx2.0也隻用到了akka生态圈裡的一小部分功能:
- akka-actor
- akka-eventbus:實作高性能工作流引擎
- akka-remoting:實作程序間通信
- akka-persistence:實作消息的At-Least-Once Delivery
3. Akka-actor in Schedulerx2.0
Schedulerx2.0支援百萬級别任務,一天上億次排程,從架構上來說,主要是
server無狀态,可水準擴充
基于akka-actor模型,單機性能高
Schedulerx2.0提供任務狀态機,如下圖

當有海量任務彙報任務狀态,單線程肯定是處理不過來的。如果用線程池又會遇到并發問題,比如目前按順序收到如下消息:
msg1: Instance=100 running
msg2: Instance=101 running
msg3: Instance=102 failed
msg4: Instance=101 success
msg5: Instance=100 failed
有可能instance=100先變成failed,最後變成running,導緻狀态機錯誤。
通過Akka-actor架構的模型,可以很容易處理這種場景:
如上圖所示,JobInstanceRoutingActor作為路由actor,用來轉發消息。下面挂載了很多jobInstanceActor,用來真實處理消息。
所有instance狀态的消息都發給JobInstanceRoutingActor,路由actor會把同一個instanceId的消息發給同一個jobInstanceActor,akka能保證一個actor按照消息接收的順序來處理消息,以此又能保證整個狀态機消息的順序性。
Schedulerx2.0中,大量采用了上面這種模型,來支撐job/workflow/instance等消息的傳遞。
4. 基于Akka-eventbus的Pub-Sub模式
在異步處理場景中,當然少不了Pub-Sub模式。相信很多人都用過guava的eventbus,可以很簡單很優雅的實作一套基于事件驅動的解決方案。通過@Subscribe注解就能注冊要訂閱的事件,通過@AllowConcurrentEvents注解還能設定并發消費事件。但是guava-eventbus在實作并發消費事件的時候非常暴力,公用一個線程池。這在Schedulerx2.0的應用場景中不太合适,比如某個job觸發頻率特别高,可能整個線程池都被他占滿了,造成其他job餓死。
在項目中大量使用actor模型之後,如果使用原生的actor通信會發現很困難,因為得知道actor的位址才能和他通信。如果有些actor要給多個actor發送消息,你的項目就會變成一個網狀的結構,新增一個actor經常會漏掉一些通信。這個時候我們就會想到Pub-Sub模式,所有actor通信隻需要給事件總線發送消息,每個actor隻需要訂閱自己的事件就好了。
如上圖所示,定時排程器、工作流引擎、任務狀态機等大部分子產品,都由akka-eventbus進行管理,每個子產品都是第四節定義的路由actor+業務actor的模型。通過該模型,相同的job交給同一個actor處理,不會堵塞其他actor,同樣解決了上文提到的guava-eventbus公用線程池的問題。實作類圖如下:
5. 兩行代碼實作程序間通信
Schedulerx2.0是Server-Worker的架構,server和worker,worker和worker都需要進行通信,使用akka-remoting可以很容易實作任意2個程序之間的通信。
Akka-remoting是peer-to-peer的通信方式,每個節點都會暴露一個遠端位址,其他節點隻要知道位址,就能進行遠端通信。Akka-remoting也抽象成一個actor,會讓你的程式保持高度的一緻,隻不過這個actor的位址是遠端的位址而已。Akka-remoting支援多種協定,使用起來非常簡單,以netty-tcp為例,首先我們在server端定義一個配置檔案akka-server.conf
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
port = 52014
}
}
}
Server隻需要2行代碼就可以起一個remote actor
ActorSystem actorSystem = ActorSystem.create("server", akkaConfig);
actorSystem.actorOf(HelloActor.props(), "hello");
Worker也隻需要2行代碼就能實作和server通信
ActorSelection helloSelection = context.actorSelection("akka.tcp://[email protected]:52014/user/hello");
helloSelection.tell("hello",getSelf());
對比Schedulerx1.0使用原生netty架構通信需要如下這麼多代碼
怎麼樣,使用akka進行遠端通信,是不是非常簡單和優雅^^
6. 消息At-Least-Once Delivery
Akka預設的消息傳遞是最多傳遞一次,即通過tell,如果發送失敗,不會重發。At-Least-Once Delivery,提供了一個消息至少傳遞一次的語義,即保證不丢!這在Schedulerx2.0中很多場景是非常需要的,比如某個執行個體在worker執行成功了,彙報成功的時候server正好重新開機了導緻彙報失敗,會造成工作流下遊都卡住沒法繼續執行。
使用At-Least-Once Delivery要繼承UntypedPersistentActorWithAtLeastOnceDelivery(akka-2.4.x)或者AbstractPersistentActorWithAtLeastOnceDelivery(akka-2.5.x)。Akka在2.5.x為了擁抱函數式程式設計,隻支援java8,并用了很多stream的接口,是以接口和2.4.x已經大大不一樣了。在Schedulerx2.0中,worker主要是給使用者用的,為了相容低版本的jdk,是以用了2.4.x版本的UntypedPersistentActorWithAtLeastOnceDelivery。
UntypedPersistentActorWithAtLeastOnceDelivery繼承UntypedPersistentActor和AtLeastOnceDelivery。
- UntypedPersistentActor:提供了持久化的actor,對消息持久化、恢複等能力。
- AtLeastOnceDelivery:主要是deliver、confirmDelivery(long deliveryId)兩個接口。
AtLeastOnceDelivery的原理非常簡單,worker向server彙報狀态的時候,tell改為deliver,deliver會自動生成一個deliveryId,封裝進request發送給server,server需要實作把deliveryId封裝到response中并傳回給worker,worker收到response的時候調用confiremDelivery,會從unconfirmed清單中移除這個deliveryId的request,否則AtLeastOnceDelivery會有一個timer,定期重試這條request。如下圖