天天看點

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程序間通信6. 消息At-Least-Once Delivery

1. 前言

Schedulerx2.0

是阿裡中間件自研的基于akka架構的新一代分布式任務排程平台,提供定時、任務編排、分布式跑批等功能,具有高可靠、海量任務、秒級排程等能力。

本篇文章以Schedulerx2.0為例子,介紹akka的應用場景,希望能給同樣從事分布式系統開發的同學一些啟發。這裡不詳細介紹akka,初學者可以直接閱讀官方文檔(

https://doc.akka.io/docs/akka/current/index.html?language=java

)。

2. Reactive

說到近幾年火熱的反應式程式設計,誰都能說幾句“異步、并發、非阻塞、高性能”等等,說到有代表性的項目,大家都知道RxJava、Akka、Reactor。

Why Reactive?

——因為Schedulerx2.0作為任務排程平台,支援海量任務排程,提供任務狀态機感覺任務狀态變化,需要Reactive的特性。

Why Akka?

——首先akka很簡單,每個actor隻需要實作一個onReceive方法。其次,Akka真的非常強大!我們可以看下官方文檔(

),Akka幾乎提供了一整套解決方案,使用akka可以很友善的實作一套高可靠、高并發、高性能的分布式系統。Schedulerx2.0也隻用到了akka生态圈裡的一小部分功能:

  • akka-actor
  • akka-eventbus:實作高性能工作流引擎
  • akka-remoting:實作程序間通信
  • akka-persistence:實作消息的At-Least-Once Delivery

3. Akka-actor in Schedulerx2.0

Schedulerx2.0支援百萬級别任務,一天上億次排程,從架構上來說,主要是

server無狀态,可水準擴充

基于akka-actor模型,單機性能高

Schedulerx2.0提供任務狀态機,如下圖

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery

當有海量任務彙報任務狀态,單線程肯定是處理不過來的。如果用線程池又會遇到并發問題,比如目前按順序收到如下消息:

msg1: Instance=100 running

msg2: Instance=101 running

msg3: Instance=102 failed

msg4: Instance=101 success

msg5: Instance=100 failed

有可能instance=100先變成failed,最後變成running,導緻狀态機錯誤。

通過Akka-actor架構的模型,可以很容易處理這種場景:

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery

如上圖所示,JobInstanceRoutingActor作為路由actor,用來轉發消息。下面挂載了很多jobInstanceActor,用來真實處理消息。

所有instance狀态的消息都發給JobInstanceRoutingActor,路由actor會把同一個instanceId的消息發給同一個jobInstanceActor,akka能保證一個actor按照消息接收的順序來處理消息,以此又能保證整個狀态機消息的順序性。

Schedulerx2.0中,大量采用了上面這種模型,來支撐job/workflow/instance等消息的傳遞。

4. 基于Akka-eventbus的Pub-Sub模式

在異步處理場景中,當然少不了Pub-Sub模式。相信很多人都用過guava的eventbus,可以很簡單很優雅的實作一套基于事件驅動的解決方案。通過@Subscribe注解就能注冊要訂閱的事件,通過@AllowConcurrentEvents注解還能設定并發消費事件。但是guava-eventbus在實作并發消費事件的時候非常暴力,公用一個線程池。這在Schedulerx2.0的應用場景中不太合适,比如某個job觸發頻率特别高,可能整個線程池都被他占滿了,造成其他job餓死。

在項目中大量使用actor模型之後,如果使用原生的actor通信會發現很困難,因為得知道actor的位址才能和他通信。如果有些actor要給多個actor發送消息,你的項目就會變成一個網狀的結構,新增一個actor經常會漏掉一些通信。這個時候我們就會想到Pub-Sub模式,所有actor通信隻需要給事件總線發送消息,每個actor隻需要訂閱自己的事件就好了。

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery

如上圖所示,定時排程器、工作流引擎、任務狀态機等大部分子產品,都由akka-eventbus進行管理,每個子產品都是第四節定義的路由actor+業務actor的模型。通過該模型,相同的job交給同一個actor處理,不會堵塞其他actor,同樣解決了上文提到的guava-eventbus公用線程池的問題。實作類圖如下:

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery

5. 兩行代碼實作程序間通信

Schedulerx2.0是Server-Worker的架構,server和worker,worker和worker都需要進行通信,使用akka-remoting可以很容易實作任意2個程序之間的通信。

Akka-remoting是peer-to-peer的通信方式,每個節點都會暴露一個遠端位址,其他節點隻要知道位址,就能進行遠端通信。Akka-remoting也抽象成一個actor,會讓你的程式保持高度的一緻,隻不過這個actor的位址是遠端的位址而已。Akka-remoting支援多種協定,使用起來非常簡單,以netty-tcp為例,首先我們在server端定義一個配置檔案akka-server.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
        port = 52014
    }
  }
}           

Server隻需要2行代碼就可以起一個remote actor

ActorSystem actorSystem = ActorSystem.create("server", akkaConfig);
actorSystem.actorOf(HelloActor.props(), "hello");           

Worker也隻需要2行代碼就能實作和server通信

ActorSelection helloSelection = context.actorSelection("akka.tcp://[email protected]:52014/user/hello");
helloSelection.tell("hello",getSelf());           

對比Schedulerx1.0使用原生netty架構通信需要如下這麼多代碼

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery

怎麼樣,使用akka進行遠端通信,是不是非常簡單和優雅^^

6. 消息At-Least-Once Delivery

Akka預設的消息傳遞是最多傳遞一次,即通過tell,如果發送失敗,不會重發。At-Least-Once Delivery,提供了一個消息至少傳遞一次的語義,即保證不丢!這在Schedulerx2.0中很多場景是非常需要的,比如某個執行個體在worker執行成功了,彙報成功的時候server正好重新開機了導緻彙報失敗,會造成工作流下遊都卡住沒法繼續執行。

使用At-Least-Once Delivery要繼承UntypedPersistentActorWithAtLeastOnceDelivery(akka-2.4.x)或者AbstractPersistentActorWithAtLeastOnceDelivery(akka-2.5.x)。Akka在2.5.x為了擁抱函數式程式設計,隻支援java8,并用了很多stream的接口,是以接口和2.4.x已經大大不一樣了。在Schedulerx2.0中,worker主要是給使用者用的,為了相容低版本的jdk,是以用了2.4.x版本的UntypedPersistentActorWithAtLeastOnceDelivery。

UntypedPersistentActorWithAtLeastOnceDelivery繼承UntypedPersistentActor和AtLeastOnceDelivery。

  • UntypedPersistentActor:提供了持久化的actor,對消息持久化、恢複等能力。
  • AtLeastOnceDelivery:主要是deliver、confirmDelivery(long deliveryId)兩個接口。

AtLeastOnceDelivery的原理非常簡單,worker向server彙報狀态的時候,tell改為deliver,deliver會自動生成一個deliveryId,封裝進request發送給server,server需要實作把deliveryId封裝到response中并傳回給worker,worker收到response的時候調用confiremDelivery,會從unconfirmed清單中移除這個deliveryId的request,否則AtLeastOnceDelivery會有一個timer,定期重試這條request。如下圖

Akka in Schedulerx2.01. 前言2. Reactive3. Akka-actor in Schedulerx2.04. 基于Akka-eventbus的Pub-Sub模式5. 兩行代碼實作程式間通信6. 消息At-Least-Once Delivery