天天看點

十分鐘了解Actor模式

Actor模式是一種并發模型,與另一種模型共享記憶體完全相反,Actor模型share nothing。所有的線程(或程序)通過消息傳遞的方式進行合作,這些線程(或程序)稱為Actor。共享記憶體更适合單機多核的并發程式設計,而且共享帶來的問題很多,程式設計也困難。随着多核時代和分布式系統的到來,共享模型已經不太适合并發程式設計,是以幾十年前就已經出現的Actor模型又重新受到了人們的重視。MapReduce就是一種典型的Actor模式,而在語言級對Actor支援的程式設計語言Erlang又重新火了起來,Scala也提供了Actor,但是并不是在語言層面支援,Java也有第三方的Actor包,Go語言channel機制也是一種類Actor模型。

單線程程式設計

多線程程式設計-共享記憶體

到了多核時代,有多個勞工,這些勞工共同使用一個倉庫和工廠中的房間,幹什麼都要排隊。比如我要從一塊鋼料切出一塊來用,我得等别人先用完。有個扳手,另一個人在用,我得等他用完。兩個人都要用一個切割機從一塊鋼材切一塊鋼鐵下來用,但是一個人拿到了鋼材,一個人拿到了切割機,他們互相都不退讓,結果誰都幹不了活。

<a href="http://s3.51cto.com/wyfs02/M00/6E/FD/wKiom1WOKpHgGDoXAAFY8MuvOgA842.jpg" target="_blank"></a>

假如現在有一個任務,找100000以内的素數的個數,最多使用是個線程,如果用共享記憶體的方法,可以用下面的代碼實作。可以看到,這些線程共享了currentNum和totalPrimeCount,對它們做操作時必須上鎖。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

<code>public</code> <code>class</code> <code>PrimeCount </code><code>implements</code> <code>Runnable {</code>

<code>   </code> 

<code>    </code><code>private</code> <code>int</code> <code>currentNum = </code><code>2</code><code>;  </code><code>//從2開始找</code>

<code>    </code><code>private</code> <code>int</code> <code>totalPrimeCount = </code><code>0</code><code>; </code><code>//目前已經找到的</code>

<code>    </code> 

<code>    </code><code>//取一個數,不能重複,最大到100000</code>

<code>    </code><code>private</code> <code>int</code> <code>incrCurrentNum() { </code>

<code>        </code><code>synchronized</code> <code>(</code><code>this</code><code>) {     </code><code>//如果不用鎖,必然會出錯。</code>

<code>            </code><code>if</code><code>(currentNum &gt; </code><code>100000</code><code>) {</code>

<code>                </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>            </code><code>} </code><code>else</code> <code>{</code>

<code>                </code><code>int</code> <code>result = currentNum;</code>

<code>                </code><code>currentNum++;</code>

<code>                </code><code>return</code> <code>result;</code>

<code>            </code><code>}  </code>

<code>        </code><code>}</code>

<code>    </code><code>}</code>

<code>   </code><code>//把某個線程找到的素數個數加上</code>

<code>    </code><code>private</code> <code>void</code> <code>accPrimeCount(</code><code>int</code> <code>count) { </code>

<code>        </code><code>synchronized</code> <code>(</code><code>this</code><code>) {</code>

<code>            </code><code>totalPrimeCount += count;</code>

<code>    </code><code>@Override</code>

<code>     </code><code>//一直取數并判斷是否為素數,取不到了就把找到的個數累加</code>

<code>    </code><code>public</code> <code>void</code> <code>run() { </code>

<code>        </code><code>int</code> <code>primeCount = </code><code>0</code><code>;</code>

<code>        </code><code>int</code> <code>num;</code>

<code>        </code><code>while</code><code>((num=incrCurrentNum()) != -</code><code>1</code><code>) {</code>

<code>            </code><code>if</code><code>(isPrime(num)) {</code>

<code>                </code><code>primeCount++;</code>

<code>            </code><code>}</code>

<code>        </code><code>accPrimeCount(primeCount);</code>

<code>    </code><code>private</code> <code>boolean</code> <code>isPrime(</code><code>int</code> <code>num) {</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>2</code><code>; i &lt; num; i++) {</code>

<code>            </code><code>if</code><code>(num % i == </code><code>0</code><code>) {</code>

<code>                </code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>} </code>

<code>    </code><code>@SuppressWarnings</code><code>(</code><code>"static-access"</code><code>)</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args){</code>

<code>        </code><code>PrimeCount pc = </code><code>new</code> <code>PrimeCount();</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>10</code><code>; i++) {</code>

<code>            </code><code>new</code> <code>Thread(pc).start();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>Thread.currentThread().sleep(</code><code>5000</code><code>);</code>

<code>        </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>

<code>            </code><code>// TODO Auto-generated catch block</code>

<code>            </code><code>e.printStackTrace();</code>

<code>        </code><code>System.out.println(pc.getTotalPrimeCount());</code>

<code>    </code><code>public</code> <code>int</code> <code>getTotalPrimeCount() {</code>

<code>        </code><code>return</code> <code>totalPrimeCount;</code>

<code> </code> 

<code>}</code>

多線程/分布式程式設計-Actor模型

到了分布式系統時代,工廠已經用流水線了,每個人都有明确分工,這就是Actor模式。每個線程都是一個Actor,這些Actor不共享任何記憶體,所有的資料都是通過消息傳遞的方式進行的。

<a href="http://s3.51cto.com/wyfs02/M00/6E/FD/wKiom1WOKyPBDRjpAAGSeStUphQ651.jpg" target="_blank"></a>

如果用Actor模型實作統計素數個數,那麼我們需要1個actor做原料的分發,就是提供要處理的整數,然後10個actor加工,每次從分發actor那裡拿一個整數進行加工,最終把加工出來的半成品發給組裝actor,組裝actor把10個加工actor的結果彙總輸出。

用scala實作,下面是工程的結構:

<a href="http://s3.51cto.com/wyfs02/M01/6F/74/wKioL1WdJcqjpAFzAAD5VFWMxWE737.jpg" target="_blank"></a>

這是它們傳遞的消息,有一些指令,剩下的都是Int資料:

<a href="http://s3.51cto.com/wyfs02/M02/6F/78/wKiom1WdJGejogeEAADFcGfvlHM213.jpg" target="_blank"></a>

一個Actor的代碼結構一般是下面這種結構,不停的接受消息并處理,沒有消息就等待:

<a href="http://s3.51cto.com/wyfs02/M01/6F/74/wKioL1WdJniAb7CiAACCrB88e3w417.jpg" target="_blank"></a>

組裝者代碼:

<a href="http://s3.51cto.com/wyfs02/M01/6F/78/wKiom1WdJQuC2KISAAD_TzvsHmU128.jpg" target="_blank"></a>

分發者代碼:

<a href="http://s3.51cto.com/wyfs02/M02/6F/78/wKiom1WdJT6ToafUAAHzO5b-wbg835.jpg" target="_blank"></a>

加工者代碼:

<a href="http://s3.51cto.com/wyfs02/M00/6F/75/wKioL1WdJ8aQb3cnAAH3zqiR_OY263.jpg" target="_blank"></a>

主線程代碼:

<a href="http://s3.51cto.com/wyfs02/M00/6F/78/wKiom1WdJkDCESVPAAHn1a3-nqU963.jpg" target="_blank"></a>

工程代碼可以在附件中下載下傳。這個代碼實作的效果與前面用Java實作的是一樣的,但是各個線程沒有共享記憶體,也沒有鎖,這樣開發起來容易,而且更适合分布式程式設計,因為分布式程式設計本身就不适合共享記憶體。Scala的Actor不能原生的支援分布式,但是Erlang可以,使用Erlang的Actor,分布式程式設計就和本地程式設計基本一樣。但是Erlang的文法難懂,而且沒有變量,幾乎所有需要使用循環的地方都得用遞歸。

本文轉自nxlhero 51CTO部落格,原文連結:http://blog.51cto.com/nxlhero/1666250,如需轉載請自行聯系原作者