天天看點

Akka in JAVA(一)

AKKA簡介

什麼是AKKA

Akka是一個由

Scala

編寫的,能相容

Sacala

JAVA

的,用于編寫高可用和高伸縮性的

Actor模型

架構.它基于了事件驅動的并發處理模式,性能非常的高,并且有很高的可用性.大大的簡化了我們在應用系統中開發并發處理的過程.它在各個領域都有很好的表現.

使用AKKA的好處

就如上面簡介中所說的,AKKA把并發操作的各種複雜的東西都統一的做了封裝.我們主要關心的是業務邏輯的實作,隻需要少量的關心

Actor模型

的串聯即可建構出高可用,高性能,高擴充的應用.

Akka for JAVA

由于AKKA是使用

Scala

編寫的,而

Scala

是一種基于JVM的語言.是以

JAVA

對AKKA的支援也是很不錯的.Akka自身又是采用微核心的方式來實作的,這就意味着能很容易的在自己的項目中應用AKKA,隻需要引入幾個akka的Lib包即可.而官方直接就提供了

Maven

庫供我們在JAVA中使用AKKA.

這些AKKA的依賴包主要有:

  • akka-actor:最核心的依賴包,裡面實作了Actor模型的大部分東西
  • akka-agent:代理/整合了Scala中的一些STM特性
  • akka-camel:整合了Apache的Camel
  • akka-cluster:akka叢集依賴,封裝了叢集成員的管理和路由
  • akka-kernel:akka的一個極簡化的應用伺服器,可以脫離項目單獨運作.
  • akka-osgi:對OSGI容器的支援,有akka的最基本的Bundle
  • akka-remote:akka遠端調用
  • akka-slf4j:Akka的日志事件監聽
  • akka-testkit:Akka的各種測試工具
  • akka-zeromq:整合ZeroMQ

    其中最總要的就是

    akka-actor

    ,最簡單的AKKA使用的話,隻需要引入這個包就可以了.

Actor模型

什麼是Actor

既然說AKKA是一個

Actor模型

架構,那麼就需要搞清楚什麼是

Actor模型

.

Actor模型

是由

Carl Hewitt

于上世紀70年代提出的,目的是為了解決分布式程式設計中的一系列問題而産生.

Actor模型

中,一切都可以抽象為Actor.

而Actor是封裝了狀态和行為的對象,他們的唯一通訊方式就是交換消息,交換的消息放在接收方的郵箱(Inbox)裡.也就是說Actor之間并不直接通信,而是通過了消息來互相溝通,每一個Actor都把它要做的事情都封裝在了它的内部.

每一個Actor是可以有狀态也可以是無狀态的,理論上來講,每一個Actor都擁有屬于自己的輕量級線程,保護它不會被系統中的其他部分影響.是以,我們在編寫Actor時,就不用擔心并發的問題.

通過Actor能夠簡化鎖以及線程管理,Actor具有以下的特性:

  • 提供了一種進階的抽象,能夠封裝狀态和操作.簡化并發應用的開發.
  • 提供了異步的非阻塞的/高性能的事件驅動模型
  • 超級輕量級的線程事件處理能力.

要在JAVA中實作一個

Actor

也非常的簡單,直接繼承

akka.actor.UntypedActor

類,然後實作

public void onReceive(Object message) throws Exception

方法即可.

Actor系統

光有一個一個獨立的Actor顯然是不行的.Akka中還有一個

Actor System

.

Actor System

統管了

Actor

,是Actor的系統工廠或管理者,掌控了Actor的生命周期.

Akka in JAVA(一)

如上圖所示,我們可以通過

ActorSystem.create

來建立一個ActorSystem的執行個體.然後通過

actorOf

等方法來擷取

ActorRef

對象.

ActorRef

即為

Actor Reference

.它是Actor的一個引用,主要的作用是發送消息給它表示的Actor.而Actor可以通過通路

self()

sender()

方法來擷取到自身或消息發送者的Actor引用.通過引用發送消息.在Akka中,Actor之間永遠都不能直接的通信,必須通過他們的代理

ActorRef

建立通信.

Actor路徑

為了實作一切事物都是Actor,為了能把一個複雜的事物劃分的更細緻.Akka引入了父子Actor.也就是Actor是有樹形結構的關系的.這樣的父子結構就能遞歸的把任何複雜的事物原子化.這也是Actor模型的精髓所在.這樣做不僅使任務本身被清晰地劃分出結構,而且最終的Actor也能按照他們明确的消息類型以及處理流程來進行解析.這樣的遞歸結構使得消息能夠在正确的層次進行處理.

Akka in JAVA(一)

為了能管理父子結構的Actor,Akka又引入了

Actor Path

,也就是Actor路徑.

Actor路徑使用類似于URL的方式來描述一個Actor,

Actor Path

在一個

Actor System

中是唯一的.通過路徑,可以很明确的看出某個Actor的父級關系是怎樣的.

1
2
3
4
5
6
7
8      
//本地Actor
"akka://my-sys/user/service-a/worker1"

//遠端Actor
"akka.tcp://[email protected]:2552/user/service-b"

//叢集Actor服務
"cluster://my-cluster/service-c"      

以上三種就是Akka中支援的

Actor

路徑. 每一個通過ActorSystem建立出來的Actor都會有一個這樣的路徑.也可以通過這個路徑從ActorSystem中擷取一個

Actor

.

當我們建立一個ActorSystem的時候,AKKA會為該System預設的建立三個Actor,并處于不同的層次:

Akka in JAVA(一)

其中的

root guardian

是所有Actor的父.

User

Actor是所有使用者建立的Actor的父.它的路徑是

/user

,通過system.actorOf()建立出來的Actor都算是使用者的Actor,也都是這個Actor的子.

System

Actor是所有系統建立的Actor的父.它的路徑是

/system

,主要的作用是提供了一系列的系統的功能.

當我們查找一個Actor的時候,可以使用ActorSystem.actorSelection()方法.并且可以使用絕對路徑或者相對路徑來擷取.如果是相對路徑,那麼

..

表示的是父Actor.比如:

1
2
3      
ActorSelection selection = system.actorSelection("../brother");
ActorRef actor = selection.anchor();
selection.tell(xxx);      

同時,也可以通過通配符來查詢邏輯的Actor層級,比如:

1
2      
ActorSelection selection = system.actorSelection("../*");
selection.tell(xxx);      

這個就表示把消息發送給目前Actor之外的所有同級的Actor.

Hello AKKA Demo

原理講了這麼多,那麼我們就來看一看一個最簡單的Akka的例子吧.

這個是一個最簡單的打招呼的例子,這個例子中,定義了招呼,打招呼的人兩個對象或者說消息.然後定義了執行打招呼和列印招呼兩個Actor.然後通過ActorSystem整合整個打招呼的過程.

Greet.java

1
2
3
4
5
6
7
8      
/**
 * 用于表示執行打招呼這個操作的消息
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:43
 */
public class Greet implements Serializable {
}      

Greeting.java

1
2
3
4
5
6
7
8
9
10
11
12      
/**
 * 招呼體,裡面有打的什麼招呼
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:44
 */
public class Greeting implements Serializable {
    public final String message;
    public Greeting(String message) {
        this.message = message;
    }
}      

WhoToGreet.java

1
2
3
4
5
6
7
8
9
10
11
12      
/**
 * 打招呼的人
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:41
 */
public class WhoToGreet implements Serializable {
    public final String who;
    public WhoToGreet(String who) {
        this.who = who;
    }
}      

Greeter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21      
/**
 * 打招呼的Actor
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:40
 */
public class Greeter extends UntypedActor{

    String greeting = "";
    
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof WhoToGreet)
            greeting = "hello, " + ((WhoToGreet) message).who;
        else if (message instanceof Greet)
            // 發送招呼消息給發送消息給這個Actor的Actor
            getSender().tell(new Greeting(greeting), getSelf());

        else unhandled(message);
    }
}      

GreetPrinter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14      
/**
 * 列印招呼
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:45
 */
public class GreetPrinter extends UntypedActor{

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Greeting)
            System.out.println(((Greeting) message).message);
    }
}      

DemoMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43      
/**
 * @author SUN
 * @version 1.0
 * @Date 16/1/6 21:39
 */
public class DemoMain {

    public static void main(String[] args) throws Exception {
        final ActorSystem system = ActorSystem.create("helloakka");

        // 建立一個到greeter Actor的管道
        final ActorRef greeter = system.actorOf(Props.create(Greeter.class), "greeter");

        // 建立郵箱
        final Inbox inbox = Inbox.create(system);

        // 先發第一個消息,消息類型為WhoToGreet
        greeter.tell(new WhoToGreet("akka"), ActorRef.noSender());

        // 真正的發送消息,消息體為Greet
        inbox.send(greeter, new Greet());

        // 等待5秒嘗試接收Greeter傳回的消息
        Greeting greeting1 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS));
        System.out.println("Greeting: " + greeting1.message);

        // 發送第三個消息,修改名字
        greeter.tell(new WhoToGreet("typesafe"), ActorRef.noSender());
        // 發送第四個消息
        inbox.send(greeter, new Greet());
        
        // 等待5秒嘗試接收Greeter傳回的消息
        Greeting greeting2 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS));
        System.out.println("Greeting: " + greeting2.message);

        // 新建立一個Actor的管道
        ActorRef greetPrinter = system.actorOf(Props.create(GreetPrinter.class));
        
        //使用schedule 每一秒發送一個Greet消息給 greeterActor,然後把greeterActor的消息傳回給greetPrinterActor 
        system.scheduler().schedule(Duration.Zero(), Duration.create(1, TimeUnit.SECONDS), greeter, new Greet(), system.dispatcher(), greetPrinter);
        //system.shutdown();
    }
}      

以上就是整個Demo的所有代碼,并不多.接下來我們就分析一下這個程式.

首先是定義的幾個消息.在Akka中傳遞的消息必須實作

Serializable

接口.

WhoToGreet

消息表示了打招呼的人,

Greeting

表示了招呼的内容,而

Greet

表示了打招呼這個動作.

接着就是兩個最重要的Actor了.

GreetPrinter

非常簡單,接收到消息後,判斷消息的類型,如果是

Greeting

招呼内容,那麼就直接列印消息到控制台.而

Greeter

這個Actor稍微複雜點,它消費兩種不同的消息,如果是

WhoToGreet

,那麼就把要打招呼的人記錄到自己的上下文中,如果是

Greet

,那麼就構造出招呼的内容,并把消息回報回sender.

最後,再來分析下DemoMain.

  1. 一來,先建立了一個

    ActorSystem

    ,
  2. 然後建立了一個

    Greeter

    Actor的執行個體,命名為

    greeter

    .
  3. 接着,為這個Actor,顯示的建立了一個

    郵箱

    .
  4. 而後,調用

    greeter.tell(new WhoToGreet("akka"), ActorRef.noSender());

    ,表示給greeter這個Actor發送一個消息,消息的内容是

    WhoToGreet

    ,發送者是空.這就意味着在greeter這個Actor内部,調用sender是不能擷取到發送者的.通過這個動作,就把消息限定為了單向的.
  5. 再然後,通過

    inbox.send(greeter, new Greet());

    ,使用郵箱顯示的發送一個Greet消息給greeter.這是給Actor發送消息的另外一種方法,這種方法通常會有更高的自主性,能完成更多更複雜的操作.但是調用起來比直接使用

    ActorRef

    來的複雜.
  6. Greeting greeting1 = (Greeting) inbox.receive(Duration.create(5, TimeUnit.SECONDS));

    表示的就是嘗試在5秒鐘内,從

    Inbox

    郵箱中擷取到回報消息.如果5秒内沒有擷取到,那麼就抛出

    TimeoutException

    異常. 由于我們在greeter這個Actor中有處理,接收到

    Greet

    消息後,就構造一個

    Greeting

    消息給

    sender

    ,是以這個地方是能夠正确的擷取到消息的回報的.
  7. 後面的操作都是一樣的,就不再重複描述.
  8. 隻有最後一個代碼稍微有點不一樣

    system.scheduler().schedule(Duration.Zero(), Duration.create(1, TimeUnit.SECONDS), greeter, new Greet(), system.dispatcher(), greetPrinter);

    ,這個使用了

    ActorSystem

    中的排程功能.每一秒鐘給greeter這個Actor發送一個

    Greet

    消息,并指定消息的發送者是

    greetPrinter

    .這樣每隔一秒鐘,greeter就會收到

    Greet

    消息,然後構造成

    Greeting

    消息,又傳回給

    GreetPrinter

    這個Actor,這個Actor接收到消息後,列印出來.形成一個環流.

版權聲明:本文為CSDN部落客「weixin_34279579」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_34279579/article/details/92519711