天天看點

POE狀态機入門與進階

一.前言

至于POE的應用,我不想多說什麼,因為需要使用狀态機的地方太多。

舉一個極端的例子,windows下的perl-tk對于多線程的支援極不穩定,如果在其中加入一個大資料量的處理應用,

結果往往會是一個無法動彈的程式。這時除了使用POE,也許沒有更好的解決辦法了。

另外,python中有叫twisted的類似架構,被廣泛地應用在網絡服務中,

具體的使用方法可以參考Oreilly出版的《Python Twisted Network Programming Essentials》。

某種意義上,也說明了狀态機的重要性。

二.基本原理與概念

在詳細說明POE伺服器的建立步驟之前,需要對POE的原理做一個大緻的了解。

在這個階段中,我們将描述這方面的不同概念,以及它們在POE中的應用。

1. 事件與事件句柄

POE 是一個為網絡工作和并行任務服務的事件驅動架構。

首先作為前提,需要掌握事件和事件驅動程式設計的意義。

在抽象意義上,事件就是真實世界中發生的一件事情。

比如說:早晨打鈴、面包從烤機裡彈出、茶煮好了等。而在使用者界面上最常見的事件則是滑鼠移動、按鈕點選和鍵盤敲打等等。

具體到程式軟體事件,則往往是一些抽象的事件。

也就是說,它不僅包括了發送給程式的外部活動,而且也包括了一些在作業系統内部運作的事件。

比如說,計時器到點了,socket建立了連接配接,下載下傳完成等。

在事件驅動程式中,中心配置設定器的作用是将事件配置設定給不同的處理程式。

這些處理程式就是事件句柄,顧名思義,它們的任務就是處理相應事件。

POE的事件句柄之間的關系是合作性質的。

沒有兩個句柄會同時運作,每一個句柄在被激發運作期間将獨占程式。

通過盡可能快地傳回來保證程式的其它部分得以順暢運作,這就是事件句柄之間的合作方式。

2. POE程式的組成部分

最簡單的POE程式包括兩個子產品和一些使用者代碼:它們分别是POE::Kernel,POE::Session以及一些事件句柄。

a. POE::Kernel:

POE::Kernel提供了基于事件的作業系統核心服務。

包括I/O事件、警報和其它計時事件、信号事件和一些不被人意識到的事件。

POE::Kernel提供不同的方法對這些事件進行設定,比如select_read(), delay()和sig()。

POE::Kernel還能夠跟蹤事件發生源和與之相關的任務之間的關系。

之是以能夠這麼做,是因為當事件發生時,它将跟蹤哪個任務被激活了。

于是它便知道了哪個任務調用方法來使用了這些資源,而這些都是自動完成的。

POE::Kernel也知道何事需将任務銷毀。它檢測任務以确定是否還有事件需要處理,或者是哪個事需要釋放占用的資源。當任務沒有事件可以觸發的時候,POE::Kernel就自動銷毀該資源。

POE::Kernel會在最後一個session停止以後終止運作。

b. POE::Session:

POE::Session執行個體就是上面所講的由POE::Kernel管理的“任務”。(以下的章節中為了便于識别将使用“session”)

每一個session都有一個自己私有的存儲空間,叫“heap”。存儲在目前session的heap中的資料很難被一個外部session得到。

每一個session還擁有自己的資源和事件句柄。這些資源為擁有它們的session生成事件,而事件隻被指派到其所處的session中。

舉例說明,有多個session都可以設定相同的警報,并且任何一個都能接受其所請求的計時事件。

但所有其他session不會在意發生在它們之外的事情。

c. 事件句柄:

事件句柄就是Perl程式。

它們因為使用了POE::Kernel傳遞的參數而不同于一般的perl程式。

POE::Kernel是通過@_來傳遞參數。該數組的前七個成員定義了發生該事件的session的上下文。

它包括了一個指向POE::Kernel運作執行個體的引用、事件自身的名字、指向私有heap的引用以及指向發出事件的session的引用。

@_中剩下的成員屬于事件自身,其中的具體内容依照被指派的事件類型而定。

舉例說明:對于I/O事件,包括兩個參數:一個是緩沖檔案句柄,另一個是用來說明采取何種行為(input、output或者異常)的标記。

POE不強求程式員為每一個事件句柄配置設定所有的參數,要不然這将變成一件非常煩人的工作,因為它們中的一些參數是不常被用到的。

而POE::Session會自動為@_輸出剩餘的常量,這樣就能使我們相對比較輕松地将注意力放在重要的參數上,而讓POE來處理不必需的參數。

POE還允許改變參數的順序和數量,而不會對程式造成影響。

比如說,KERNEL,HEAP和ARG0分别是 POE::Kernel執行個體、目前session的堆棧和事件的第一個使用者參數。

它們可以一個個直接從@_被導出。

my $kernel = $_[KERNEL];

my $heap = $_[HEAP];

my $thingy = $_[ARG0];

或者一次性以隊列片段的形式指派給程式參數。

my ( $kernel, $heap, $thingy ) = @_[KERNEL, HEAP, ARG0];

當然在事件句柄中我們也可以直接使用$_[KERNEL],$_[HEAP]和$_[AG0]。

但是因為諸如ARG0的參數很難從字面上知道它在事件中代表的真實意義, 是以我們不提倡直接使用這種做法。

三.簡單的POE例子

現在大緻知道了POE程式設計的概念,我們将舉若幹例子來了解它到底是怎麼運作的。

1. 一個單session的例子

簡單的POE程式包括三個部分:

 一個用來加載子產品和配置條件的前端,

 初始化并且運作一個或者多個session的主體

 和用來描述事件句柄的具體程式。

a. 前端:

#!/usr/bin/perl

use warnings;

use strict;

use POE;

引入POE子產品的過程的背後隐藏了一些細節,

事實上這麼做還加載了一些諸如POE::Kernel和POE:Sesson的子產品并相應地做了一些初始化,而通常在每個POE程式中我們都會用到這些隐藏子產品。

在POE::Kernel第一次被引入時,它生成了一個将貫穿整個程式POE::Kernel執行個體。

POE::Session會根據不同的事件輸出預設常量給事件句柄的某些參數,如:KERNEL,HEAP,ARG0等等。

是以一個簡單的use POE為程式做了大量的初始化工作。

b. 主體session:

當所有的條件都準備好之後,為了保證POE::Kernel的有效運作,我們必須建立至少一個session。不然的話,運作程式意味着無事可做。

在這個例子裡,我們将建立一個包含_start,_stop和count這三個事件的任務。

POE::Session将每個事件與一個句柄聯系在一起。

POE::Session->create{

    Inline_states => {

        _start => \&session_start,

        _stop => \&session_stop,

        count => \&session_count,

    }

};

前兩個事件是由POE::Kernel自身所提供的。它們分别表示該session的啟動和銷毀。

最後一個事件是使用者自定義事件,它被用于程式的邏輯之中。

我們之是以沒有為session保留一個引用的原因是因為該session會自動被注冊到POE::Kernel中,并接收它的管理,

而我們在程式是很少直接使用該session的。

事實上,儲存一個session的應用是存在危險的。

因為如果存在顯式的引用,Perl将不會自動銷毀session對象或者重新為其配置設定記憶體。

接着我們啟動POE::Kernel,由此便建立了一個用來探測并分派事件的主循環。

在此示例程式中,為了使運作結果更加明确,我們将注明POE::Kernel運作的開始處和結束點。

print “Starting POE::Kernel.\n”;

POE::Kernel->run();

print “POE::Kernle’s run method returned.\n”;

exit;

Kernel的run方法隻有在所有session傳回之後才會停止循環。

之後,我們調用一個表示程式結束的提示符的exit系統方法來表示程式被終止,而在實際的應用中這麼做是不必要的。

c. 事件句柄:

下面我們來了解一下事件句柄的應用,首先從_start開始。

_start的句柄将在sesson初始化完成之後開始運作,session在其自身的上下文中使用它來實作輸入引導。

比如初始化heap中的值,或者配置設定一些必要的資源等等。

在該句柄中我們建立了一個累加器,并且發出了“count”事件以觸發相應的事件句柄。

sub session_start {

    print "Session ", $_[SESSION]->ID, " has started.\n";

    $_[HEAP]->{count} = 0;

    $_[KERNEL]->yield("count");

}

一些熟悉線程程式設計的人可能會對yield方法在這裡的使用産生困惑。

事實上,它并不是用來中止session運作的,而是将一個事件放入fifo分派隊列的末尾處,

當隊列中在其之前的事件被處理完畢之後,該事件将被觸發以運作相應的事件句柄。

這個概念在多任務環境下更容易被了解。我們可以通過在調用yield方法之後立即傳回的辦法,來清晰地展現yield方法的行為。

接下來的是_stop句柄。POE::Kernel将在所有session再無事件可觸發之後,并且是在自身被銷毀之前調用它。

sub session_stop {

    print "Session ", $_[SESSION]->ID, " has stopped.\n";

}

在_stop中設定一個事件是無用的。

銷毀session的過程本身包括清理與之相關的資源,而事件就是資源的組成部分。

是以對于所有在_stop中的事件,在其能夠被分派之前都是将被清理的。

最後講一下count事件句柄。

該函數用來增加heap中的累加器計數,并列印累加結果。我們可以使用一個while來完成這件工作,

​但是用yield方法一來可以使得程式更短小精悍,二來還能夠加深對POE事件處理原理的了解。

sub session_count {

    my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];

    my $session_id = $_[SESSION]->ID;

    my $count = ++$heap->{count};

    print "Session $session_id has counted to $count.\n";

    $kernel->yield("count") if $count < 10;

}

該函數的最後一句表示:

​隻要累加器計數未超過10,session将再yield一個count事件。

​因為不斷地觸發了session_count句柄,使得目前session可以繼續得以生存而不會被POE::Kernel清理。

當計數器到10時,便不再調用yield指令,session也将停止。

​一旦POE::Kernel檢測到該session再沒有事件句柄可被激發,便在調用_stop事件句柄之後将其清理銷毀。

以下是運作的結果:

  Session 2 has started.

  Starting POE::Kernel.

  Session 2 has counted to 1.

  Session 2 has counted to 2.

  Session 2 has counted to 3.

  Session 2 has counted to 4.

  Session 2 has counted to 5.

  Session 2 has counted to 6.

  Session 2 has counted to 7.

  Session 2 has counted to 8.

  Session 2 has counted to 9.

  Session 2 has counted to 10.

  Session 2 has stopped.

  POE::Kernel's run() method returned.

對于該結果有幾點需要解釋一下。

A. 為什麼在運作結果中session的id是2?

​      因為通常情況下,POE::Kernel是最先被建立的,它的id号會是1。接下來建立session的id号依次被累加。

​B. 為什麼_start事件在kernel運作之前?

    因為當運作POE::Session->create時就會分派_start事件,是以_start事件句柄的激發是在POE::Kernel運作之前的。

C. 為什麼_strart事件之後的count事件沒有立即處理?    

​    第一個count事件句柄并沒有被立即處理。這是因為該事件被Kernel放入了分派隊列之中。

D. Session停止的原因有哪些?  

​    導緻session停止的原因除了再沒有事件可觸發而之外,外部的終止信号也可以用來停止session。

2.多任務的POE例子

可以将以上的這個計數程式做成多任務的形式,使每一個session将在其自身的heap中儲存累加器。

​各個session的事件被依次傳送到POE::Kernel的事件隊列中,并以先進先出的形式進行處理,以保證這些事件将輪流被執行。

為了示範這個結果,我們将複制以上程式中的session部分,其它部分保持原樣不變。

for ( 1 .. 2 ) {

    POE::Session->create(

        inline_states => {

            _start => \&session_start,

            _stop  => \&session_stop,

            count  => \&session_count,

          }

    );

}

以下便是修改後的程式的運作結果:

  Session 2 has started.

  Session 3 has started.

  Starting POE::Kernel.

  Session 2 has counted to 1.

  Session 3 has counted to 1.

  Session 2 has counted to 2.

  Session 3 has counted to 2.

  Session 2 has counted to 3.

  Session 3 has counted to 3.

  Session 2 has counted to 4.

  Session 3 has counted to 4.

  Session 2 has counted to 5.

  Session 3 has counted to 5.

  Session 2 has counted to 6.

  Session 3 has counted to 6.

  Session 2 has counted to 7.

  Session 3 has counted to 7.

  Session 2 has counted to 8.

  Session 3 has counted to 8.

  Session 2 has counted to 9.

  Session 3 has counted to 9.

  Session 2 has counted to 10.

  Session 2 has stopped.

  Session 3 has counted to 10.

  Session 3 has stopped.

  POE::Kernel's run() method returned.

每一個session是在自身heap中儲存計數資料的,這與我們建立的session執行個體數量無關。

​POE輪次處理每一個事件,每次隻有一個事件句柄被運作。

​當事件句柄運作的時候,POE::Kernel自身也将被中斷,在事件句柄傳回之前,沒有事件被分派。

當各個session的事件被傳送到主程式事件隊列後,位于隊列頭部的事件被首先處理,新來的事件将被放置在隊列的尾部。

​以此保證隊列的輪次處理。

​ POE::Kernek的run方法在最後一個session停止之後傳回。

四.回聲伺服器

最後我們将用IO::Select建立一個非派生的回聲伺服器,然後再利用多個抽象層的概念将它移植到POE上。

1. 一個簡單的select()伺服器

這個非派生的伺服器的原型來自于《Perl Cookbook》中的17.13章節。

​為了保持簡潔并且也是為了更友善于移植到POE上,對其做了一些修改。

​同時為了增加可讀性,還給該伺服器設定一些小的目的和功能。

首先,需要引入所需的子產品并初始化一些資料結構。

#!/usr/bin/perl

use warnings;

use strict;

use IO::Socket;

use IO::Select;

use Tie::RefHash;

my %inbuffer  = ();

my %outbuffer = ();

my %ready = ();

tie %ready, "Tie::RefHash";

接下來,我們要建立一個伺服器socket。為了不阻塞單程序的伺服器,這個socket被設定為非阻塞狀态。

my $server = IO::Socket::INET->new

  ( LocalPort => 12345,

    Listen => 10,

  ) or die "can't make server socket: $@\n";

$server->blocking(0);

然後建立主循環。

​我們制造一個IO::Socket對象用以監視socket上的活動。

​無論何時,當有一個事件發生在socket上,都會有相應的程式來處理它。

my $select = IO::Select->new($server);

while (1) {

    foreach my $client ( $select->can_read(1) ) {

        handle_read($client);

    }

    foreach my $client ( keys %ready ) {

        foreach my $request ( @{ $ready{$client} } ) {

            print "Got request: $request";

            $outbuffer{$client} .= $request;

        }

        delete $ready{$client};

    }

    foreach my $client ( $select->can_write(1) ) {

        handle_write($client);

    }

}

exit;

以上的主循環對整個程式做了一個大緻的總結。

​下面是用于處理socket不同行為的幾個函數:

第一個函數用來處理可讀狀态的socket。

​     如果這個準備就緒的socket是伺服器的socket,我們再接收一個新的連接配接,并将它注冊到IO::Socket對象中。

​     如果這是一個存在輸入資料的用戶端socket,我們讀取資料并對其進行處理,并将處理的結果添加到%ready資料結構中。

​     主循環會捕獲在%ready中的資料,并将它們回傳給用戶端。

sub handle_read {

    my $client = shift;

    if ( $client == $server ) {

        my $new_client = $server->accept();

        $new_client->blocking(0);

        $select->add($new_client);

        return;

    }

    my $data = "";

    my $rv   = $client->recv( $data, POSIX::BUFSIZ, 0 );

    unless ( defined($rv) and length($data) ) {

        handle_error($client);

        return;

    }

    $inbuffer{$client} .= $data;

    while ( $inbuffer{$client} =~ s/(.*\n)// ) {

        push @{ $ready{$client} }, $1;

    }

}

接下來是一個處理可寫狀态的socket的函數。

​  等待被發送到用戶端的資料将被寫到這個socket中,之後被從輸出緩沖中删除。

sub handle_write {

    my $client = shift;

    return unless exists $outbuffer{$client};

    my $rv = $client->send( $outbuffer{$client}, 0 );

    unless ( defined $rv ) {

        warn "I was told I could write, but I can't.\n";

        return;

    }

    if ( $rv == length( $outbuffer{$client} ) or

        $! == POSIX::EWOULDBLOCK

      ) {

        substr( $outbuffer{$client}, 0, $rv ) = "";

        delete $outbuffer{$client} unless length $outbuffer{$client};

        return;

    }

    handle_error($client);

}

最後我們需要一個程式來處理客戶socket在讀取和發送資料時産生的錯誤。

​它會為發生錯誤的socket做一些清理工作,并保證它們被正确關閉。

sub handle_error {

    my $client = shift;

    delete $inbuffer{$client};

    delete $outbuffer{$client};

    delete $ready{$client};

    $select->remove($client);

    close $client;

}

短短130行代碼,我們就有了一個簡單的回聲伺服器。不算太壞,但是我們可以做得更好。

2. 将伺服器移植到POE上

為了把IO::Socket伺服器移植到POE上,需要使用到某些POE的底層特征。

​為了詳細說明的需要,我們竟可能地不省略細節,而最終的程式也将保留其中的大部分代碼。

​事實上,以上的IO::Socket伺服器本身就是由事件驅動的,在其中包含了一個用于檢測并分派之間的主循環,配以處理這些事件的相應事件句柄。

​從這一點上來說,與POE的原理和架構有異曲同工的意思。

新的伺服器程式需要一個如下所示的POE空架構。用于具體功能實作的代碼将被添加到這個架構之中。

#!/usr/bin/perl

use warnings;

use strict;

use POSIX;

use IO::Socket;

use POE;

POE::Session->create

  ( inline_states =>

      {

      }

  );

POE::Kernel->run();

exit;

在繼續完成接下來的程式之前,為了勾勒出程式的大緻架構結構,必須明确哪些事件的出現是必要的。

1)    伺服器啟動,完成初始化。

2)    伺服器socket準備就緒,可以接收連接配接。

3)    客戶socket處于可讀取狀态,伺服器讀取資料并對其進行處理。

4)    客戶socket處于可寫狀态,伺服器對其寫入一些資料。

5)    客戶socket發生錯誤,需要将其關閉。

一旦知道了需要做些什麼,就可以建立這些事件的名稱,并為這些事件編寫相應的事件處理句柄,進而快速地完成POE::Session的構造。

POE::Session->create

  ( inline_states =>

      { _start => \&server_start,

        event_accept => \&server_accept,

        event_read   => \&client_read,

        event_write  => \&client_write,

        event_error  => \&client_error,

      }

  );

現在是真正将IO::Select代碼移植過來的時候了!

​和IO::Select伺服器相同,需要為客戶socket提供輸入和輸出緩沖,

​而且由于這兩個緩沖對socket句柄的重要性并且不存在沖突,它們将被保持為程式的全局變量。

​另外,在這裡将不再使用%ready哈希表。

my %inbuffer  = ();

my %outbuffer = ();

緊接着是引入IO::Select的程式片段,因為每一段都是上面指定的事件所觸發的,是以這些片段将被移植入相應事件的處理句柄中。

在_start事件的處理句柄中,需要建立一個伺服器的監聽socket,并用select_read為其配置設定一個事件發生器。

​句柄中用到的POE::Kernel子產品中的select_read方法接收兩個參數:

​第一個是需要監視的socket,

​第二個是當該socket處于可讀狀态時所觸發的處理句柄。

sub server_start {

    my $server = IO::Socket::INET->new

      ( LocalPort => 12345,

        Listen => 10,

        Reuse  => "yes",

      ) or die "can't make server socket: $@\n";

    $_[KERNEL]->select_read( $server, "event_accept" );

}

注意一點,我們并沒有儲存伺服器socket。

​因為POE::Kernel會對其進行跟蹤,并将其作為一個參數傳遞給event_accept事件句柄。

​隻有在需要特殊用途的情況下,我們才會儲存一個該socket的拷貝。

再回顧POE::Session構造器,事件event_accept會激發server_accept事件句柄。

​該句柄接收一個新的客戶socket,并對其配置設定一個螢幕。

sub server_accept {

    my ( $kernel, $server ) = @_[ KERNEL, ARG0 ];

    my $new_client = $server->accept();

    $kernel->select_read( $new_client, "event_read" );

}

之後我們在client_read句柄中處理處理來自客戶的資料。

​當新連接配接的客戶socket處于可讀狀态時,句柄被觸發。

​該句柄中的第一個客戶參數即為所連接配接的客戶socket,是以我們無需再為其保留一個拷貝。

在内容上,句柄client_read與IO::Select伺服器中的handle_read幾乎一樣。

​而由于handle_read的accept部分的内容被移植到了server_accept句柄中,相應地就不再需要%ready哈希表了。

如果接收過程發生錯誤,客戶socket通過POE::Kernel的yield方法将被傳送到event_error句柄中。

​因為該yield方法是根據程式員的具體要求發送事件的,是以需要在yield中将發生錯誤的客戶socket作為某個參數,

​而此socket在處理該event_error的事件句柄client_error中被指派給$_[ARG0]。

接着,如果在client_read中發現輸出緩存存在資料,

​我們将檢測以保證當該客戶socket處于可寫狀态時,及時觸發事件處理句柄,将資料發送出去。

sub client_read {

    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    my $data = "";

    my $rv   = $client->recv( $data, POSIX::BUFSIZ, 0 );

    unless ( defined($rv) and length($data) ) {

        $kernel->yield( event_error => $client );

        return;

    }

    $inbuffer{$client} .= $data;

    while ( $inbuffer{$client} =~ s/(.*\n)// ) {

        $outbuffer{$client} .= $1;

    }

    if ( exists $outbuffer{$client} ) {

        $kernel->select_write( $client, "event_write" );

    }

}

在用于發送資料的事件句柄中,第一個客戶參數依然是一個可用的socket。

​在該句柄中,如果輸出緩存為空,則停止檢測并迅速傳回。

​否則,我們将試圖将緩沖内的資料全部發出。如果所有資料均發送成功,該緩沖将被銷毀。

​與client_read類似,client_write中也有相應的錯誤處理句柄。

sub client_write {

    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    unless ( exists $outbuffer{$client} ) {

        $kernel->select_write($client);

        return;

    }

    my $rv = $client->send( $outbuffer{$client}, 0 );

    unless ( defined $rv ) {

        warn "I was told I could write, but I can't.\n";

        return;

    }

    if ( $rv == length( $outbuffer{$client} ) or

        $! == POSIX::EWOULDBLOCK

      ) {

        substr( $outbuffer{$client}, 0, $rv ) = "";

        delete $outbuffer{$client} unless length $outbuffer{$client};

        return;

    }

    $kernel->yield( event_error => $client );

}

最後說明一下在以上兩個句柄中被用到的錯誤處理句柄。

​我們首先删除了客戶socket的輸入輸出緩存,再關閉建立在該socket上的所有監視,最後保證該socket被成功關閉。

​如此這般便有效地關閉了來自于客戶的連接配接。

sub client_error {

    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    delete $inbuffer{$client};

    delete $outbuffer{$client};

    $kernel->select($client);

    close $client;

}

移植成功!

原文連結: http://www.cnitblog.com/gyn/archive/2008/05/09/43537.html