一.前言
至于POE的應用,我不想多說什麼,因為需要使用狀态機的地方太多。
舉一個極端的例子,windows下的perl-tk對于多線程的支援極不穩定,如果在其中加入一個大資料量的處理應用,
結果往往會是一個無法動彈的程式。這時除了使用POE,也許沒有更好的解決辦法了。
另外,python中有叫twisted的類似架構,被廣泛地應用在網絡服務中,
具體的使用方法可以參考Oreilly出版的《Python Twisted Network Programming Essentials》。
某種意義上,也說明了狀态機的重要性。
二.基本原理與概念
在詳細說明POE伺服器的建立步驟之前,需要對POE的原理做一個大緻的了解。
在這個階段中,我們将描述這方面的不同概念,以及它們在POE中的應用。
1. 事件與事件句柄
POE 是一個為網絡工作和并行任務服務的事件驅動架構。
首先作為前提,需要掌握事件和事件驅動程式設計的意義。
在抽象意義上,事件就是真實世界中發生的一件事情。
比如說:早晨打鈴、面包從烤機裡彈出、茶煮好了等。而在使用者界面上最常見的事件則是滑鼠移動、按鈕點選和鍵盤敲打等等。
具體到程式軟體事件,則往往是一些抽象的事件。
也就是說,它不僅包括了發送給程式的外部活動,而且也包括了一些在作業系統内部運作的事件。
比如說,計時器到點了,socket建立了連接配接,下載下傳完成等。
在事件驅動程式中,中心配置設定器的作用是将事件配置設定給不同的處理程式。
這些處理程式就是事件句柄,顧名思義,它們的任務就是處理相應事件。
POE的事件句柄之間的關系是合作性質的。
沒有兩個句柄會同時運作,每一個句柄在被激發運作期間将獨占程式。
通過盡可能快地傳回來保證程式的其它部分得以順暢運作,這就是事件句柄之間的合作方式。
2. POE程式的組成部分
最簡單的POE程式包括兩個子產品和一些使用者代碼:它們分别是POE::Kernel,POE::Session以及一些事件句柄。
a. POE::Kernel:
POE::Kernel提供了基于事件的作業系統核心服務。
包括I/O事件、警報和其它計時事件、信号事件和一些不被人意識到的事件。
POE::Kernel提供不同的方法對這些事件進行設定,比如select_read(), delay()和sig()。
POE::Kernel還能夠跟蹤事件發生源和與之相關的任務之間的關系。
之是以能夠這麼做,是因為當事件發生時,它将跟蹤哪個任務被激活了。
于是它便知道了哪個任務調用方法來使用了這些資源,而這些都是自動完成的。
POE::Kernel也知道何事需将任務銷毀。它檢測任務以确定是否還有事件需要處理,或者是哪個事需要釋放占用的資源。當任務沒有事件可以觸發的時候,POE::Kernel就自動銷毀該資源。
POE::Kernel會在最後一個session停止以後終止運作。
b. POE::Session:
POE::Session執行個體就是上面所講的由POE::Kernel管理的“任務”。(以下的章節中為了便于識别将使用“session”)
每一個session都有一個自己私有的存儲空間,叫“heap”。存儲在目前session的heap中的資料很難被一個外部session得到。
每一個session還擁有自己的資源和事件句柄。這些資源為擁有它們的session生成事件,而事件隻被指派到其所處的session中。
舉例說明,有多個session都可以設定相同的警報,并且任何一個都能接受其所請求的計時事件。
但所有其他session不會在意發生在它們之外的事情。
c. 事件句柄:
事件句柄就是Perl程式。
它們因為使用了POE::Kernel傳遞的參數而不同于一般的perl程式。
POE::Kernel是通過@_來傳遞參數。該數組的前七個成員定義了發生該事件的session的上下文。
它包括了一個指向POE::Kernel運作執行個體的引用、事件自身的名字、指向私有heap的引用以及指向發出事件的session的引用。
@_中剩下的成員屬于事件自身,其中的具體内容依照被指派的事件類型而定。
舉例說明:對于I/O事件,包括兩個參數:一個是緩沖檔案句柄,另一個是用來說明采取何種行為(input、output或者異常)的标記。
POE不強求程式員為每一個事件句柄配置設定所有的參數,要不然這将變成一件非常煩人的工作,因為它們中的一些參數是不常被用到的。
而POE::Session會自動為@_輸出剩餘的常量,這樣就能使我們相對比較輕松地将注意力放在重要的參數上,而讓POE來處理不必需的參數。
POE還允許改變參數的順序和數量,而不會對程式造成影響。
比如說,KERNEL,HEAP和ARG0分别是 POE::Kernel執行個體、目前session的堆棧和事件的第一個使用者參數。
它們可以一個個直接從@_被導出。
my $kernel = $_[KERNEL];
my $heap = $_[HEAP];
my $thingy = $_[ARG0];
或者一次性以隊列片段的形式指派給程式參數。
my ( $kernel, $heap, $thingy ) = @_[KERNEL, HEAP, ARG0];
當然在事件句柄中我們也可以直接使用$_[KERNEL],$_[HEAP]和$_[AG0]。
但是因為諸如ARG0的參數很難從字面上知道它在事件中代表的真實意義, 是以我們不提倡直接使用這種做法。
三.簡單的POE例子
現在大緻知道了POE程式設計的概念,我們将舉若幹例子來了解它到底是怎麼運作的。
1. 一個單session的例子
簡單的POE程式包括三個部分:
一個用來加載子產品和配置條件的前端,
初始化并且運作一個或者多個session的主體
和用來描述事件句柄的具體程式。
a. 前端:
#!/usr/bin/perl
use warnings;
use strict;
use POE;
引入POE子產品的過程的背後隐藏了一些細節,
事實上這麼做還加載了一些諸如POE::Kernel和POE:Sesson的子產品并相應地做了一些初始化,而通常在每個POE程式中我們都會用到這些隐藏子產品。
在POE::Kernel第一次被引入時,它生成了一個将貫穿整個程式POE::Kernel執行個體。
POE::Session會根據不同的事件輸出預設常量給事件句柄的某些參數,如:KERNEL,HEAP,ARG0等等。
是以一個簡單的use POE為程式做了大量的初始化工作。
b. 主體session:
當所有的條件都準備好之後,為了保證POE::Kernel的有效運作,我們必須建立至少一個session。不然的話,運作程式意味着無事可做。
在這個例子裡,我們将建立一個包含_start,_stop和count這三個事件的任務。
POE::Session将每個事件與一個句柄聯系在一起。
POE::Session->create{
Inline_states => {
_start => \&session_start,
_stop => \&session_stop,
count => \&session_count,
}
};
前兩個事件是由POE::Kernel自身所提供的。它們分别表示該session的啟動和銷毀。
最後一個事件是使用者自定義事件,它被用于程式的邏輯之中。
我們之是以沒有為session保留一個引用的原因是因為該session會自動被注冊到POE::Kernel中,并接收它的管理,
而我們在程式是很少直接使用該session的。
事實上,儲存一個session的應用是存在危險的。
因為如果存在顯式的引用,Perl将不會自動銷毀session對象或者重新為其配置設定記憶體。
接着我們啟動POE::Kernel,由此便建立了一個用來探測并分派事件的主循環。
在此示例程式中,為了使運作結果更加明确,我們将注明POE::Kernel運作的開始處和結束點。
print “Starting POE::Kernel.\n”;
POE::Kernel->run();
print “POE::Kernle’s run method returned.\n”;
exit;
Kernel的run方法隻有在所有session傳回之後才會停止循環。
之後,我們調用一個表示程式結束的提示符的exit系統方法來表示程式被終止,而在實際的應用中這麼做是不必要的。
c. 事件句柄:
下面我們來了解一下事件句柄的應用,首先從_start開始。
_start的句柄将在sesson初始化完成之後開始運作,session在其自身的上下文中使用它來實作輸入引導。
比如初始化heap中的值,或者配置設定一些必要的資源等等。
在該句柄中我們建立了一個累加器,并且發出了“count”事件以觸發相應的事件句柄。
sub session_start {
print "Session ", $_[SESSION]->ID, " has started.\n";
$_[HEAP]->{count} = 0;
$_[KERNEL]->yield("count");
}
一些熟悉線程程式設計的人可能會對yield方法在這裡的使用産生困惑。
事實上,它并不是用來中止session運作的,而是将一個事件放入fifo分派隊列的末尾處,
當隊列中在其之前的事件被處理完畢之後,該事件将被觸發以運作相應的事件句柄。
這個概念在多任務環境下更容易被了解。我們可以通過在調用yield方法之後立即傳回的辦法,來清晰地展現yield方法的行為。
接下來的是_stop句柄。POE::Kernel将在所有session再無事件可觸發之後,并且是在自身被銷毀之前調用它。
sub session_stop {
print "Session ", $_[SESSION]->ID, " has stopped.\n";
}
在_stop中設定一個事件是無用的。
銷毀session的過程本身包括清理與之相關的資源,而事件就是資源的組成部分。
是以對于所有在_stop中的事件,在其能夠被分派之前都是将被清理的。
最後講一下count事件句柄。
該函數用來增加heap中的累加器計數,并列印累加結果。我們可以使用一個while來完成這件工作,
但是用yield方法一來可以使得程式更短小精悍,二來還能夠加深對POE事件處理原理的了解。
sub session_count {
my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
my $session_id = $_[SESSION]->ID;
my $count = ++$heap->{count};
print "Session $session_id has counted to $count.\n";
$kernel->yield("count") if $count < 10;
}
該函數的最後一句表示:
隻要累加器計數未超過10,session将再yield一個count事件。
因為不斷地觸發了session_count句柄,使得目前session可以繼續得以生存而不會被POE::Kernel清理。
當計數器到10時,便不再調用yield指令,session也将停止。
一旦POE::Kernel檢測到該session再沒有事件句柄可被激發,便在調用_stop事件句柄之後将其清理銷毀。
以下是運作的結果:
Session 2 has started.
Starting POE::Kernel.
Session 2 has counted to 1.
Session 2 has counted to 2.
Session 2 has counted to 3.
Session 2 has counted to 4.
Session 2 has counted to 5.
Session 2 has counted to 6.
Session 2 has counted to 7.
Session 2 has counted to 8.
Session 2 has counted to 9.
Session 2 has counted to 10.
Session 2 has stopped.
POE::Kernel's run() method returned.
對于該結果有幾點需要解釋一下。
A. 為什麼在運作結果中session的id是2?
因為通常情況下,POE::Kernel是最先被建立的,它的id号會是1。接下來建立session的id号依次被累加。
B. 為什麼_start事件在kernel運作之前?
因為當運作POE::Session->create時就會分派_start事件,是以_start事件句柄的激發是在POE::Kernel運作之前的。
C. 為什麼_strart事件之後的count事件沒有立即處理?
第一個count事件句柄并沒有被立即處理。這是因為該事件被Kernel放入了分派隊列之中。
D. Session停止的原因有哪些?
導緻session停止的原因除了再沒有事件可觸發而之外,外部的終止信号也可以用來停止session。
2.多任務的POE例子
可以将以上的這個計數程式做成多任務的形式,使每一個session将在其自身的heap中儲存累加器。
各個session的事件被依次傳送到POE::Kernel的事件隊列中,并以先進先出的形式進行處理,以保證這些事件将輪流被執行。
為了示範這個結果,我們将複制以上程式中的session部分,其它部分保持原樣不變。
for ( 1 .. 2 ) {
POE::Session->create(
inline_states => {
_start => \&session_start,
_stop => \&session_stop,
count => \&session_count,
}
);
}
以下便是修改後的程式的運作結果:
Session 2 has started.
Session 3 has started.
Starting POE::Kernel.
Session 2 has counted to 1.
Session 3 has counted to 1.
Session 2 has counted to 2.
Session 3 has counted to 2.
Session 2 has counted to 3.
Session 3 has counted to 3.
Session 2 has counted to 4.
Session 3 has counted to 4.
Session 2 has counted to 5.
Session 3 has counted to 5.
Session 2 has counted to 6.
Session 3 has counted to 6.
Session 2 has counted to 7.
Session 3 has counted to 7.
Session 2 has counted to 8.
Session 3 has counted to 8.
Session 2 has counted to 9.
Session 3 has counted to 9.
Session 2 has counted to 10.
Session 2 has stopped.
Session 3 has counted to 10.
Session 3 has stopped.
POE::Kernel's run() method returned.
每一個session是在自身heap中儲存計數資料的,這與我們建立的session執行個體數量無關。
POE輪次處理每一個事件,每次隻有一個事件句柄被運作。
當事件句柄運作的時候,POE::Kernel自身也将被中斷,在事件句柄傳回之前,沒有事件被分派。
當各個session的事件被傳送到主程式事件隊列後,位于隊列頭部的事件被首先處理,新來的事件将被放置在隊列的尾部。
以此保證隊列的輪次處理。
POE::Kernek的run方法在最後一個session停止之後傳回。
四.回聲伺服器
最後我們将用IO::Select建立一個非派生的回聲伺服器,然後再利用多個抽象層的概念将它移植到POE上。
1. 一個簡單的select()伺服器
這個非派生的伺服器的原型來自于《Perl Cookbook》中的17.13章節。
為了保持簡潔并且也是為了更友善于移植到POE上,對其做了一些修改。
同時為了增加可讀性,還給該伺服器設定一些小的目的和功能。
首先,需要引入所需的子產品并初始化一些資料結構。
#!/usr/bin/perl
use warnings;
use strict;
use IO::Socket;
use IO::Select;
use Tie::RefHash;
my %inbuffer = ();
my %outbuffer = ();
my %ready = ();
tie %ready, "Tie::RefHash";
接下來,我們要建立一個伺服器socket。為了不阻塞單程序的伺服器,這個socket被設定為非阻塞狀态。
my $server = IO::Socket::INET->new
( LocalPort => 12345,
Listen => 10,
) or die "can't make server socket: $@\n";
$server->blocking(0);
然後建立主循環。
我們制造一個IO::Socket對象用以監視socket上的活動。
無論何時,當有一個事件發生在socket上,都會有相應的程式來處理它。
my $select = IO::Select->new($server);
while (1) {
foreach my $client ( $select->can_read(1) ) {
handle_read($client);
}
foreach my $client ( keys %ready ) {
foreach my $request ( @{ $ready{$client} } ) {
print "Got request: $request";
$outbuffer{$client} .= $request;
}
delete $ready{$client};
}
foreach my $client ( $select->can_write(1) ) {
handle_write($client);
}
}
exit;
以上的主循環對整個程式做了一個大緻的總結。
下面是用于處理socket不同行為的幾個函數:
第一個函數用來處理可讀狀态的socket。
如果這個準備就緒的socket是伺服器的socket,我們再接收一個新的連接配接,并将它注冊到IO::Socket對象中。
如果這是一個存在輸入資料的用戶端socket,我們讀取資料并對其進行處理,并将處理的結果添加到%ready資料結構中。
主循環會捕獲在%ready中的資料,并将它們回傳給用戶端。
sub handle_read {
my $client = shift;
if ( $client == $server ) {
my $new_client = $server->accept();
$new_client->blocking(0);
$select->add($new_client);
return;
}
my $data = "";
my $rv = $client->recv( $data, POSIX::BUFSIZ, 0 );
unless ( defined($rv) and length($data) ) {
handle_error($client);
return;
}
$inbuffer{$client} .= $data;
while ( $inbuffer{$client} =~ s/(.*\n)// ) {
push @{ $ready{$client} }, $1;
}
}
接下來是一個處理可寫狀态的socket的函數。
等待被發送到用戶端的資料将被寫到這個socket中,之後被從輸出緩沖中删除。
sub handle_write {
my $client = shift;
return unless exists $outbuffer{$client};
my $rv = $client->send( $outbuffer{$client}, 0 );
unless ( defined $rv ) {
warn "I was told I could write, but I can't.\n";
return;
}
if ( $rv == length( $outbuffer{$client} ) or
$! == POSIX::EWOULDBLOCK
) {
substr( $outbuffer{$client}, 0, $rv ) = "";
delete $outbuffer{$client} unless length $outbuffer{$client};
return;
}
handle_error($client);
}
最後我們需要一個程式來處理客戶socket在讀取和發送資料時産生的錯誤。
它會為發生錯誤的socket做一些清理工作,并保證它們被正确關閉。
sub handle_error {
my $client = shift;
delete $inbuffer{$client};
delete $outbuffer{$client};
delete $ready{$client};
$select->remove($client);
close $client;
}
短短130行代碼,我們就有了一個簡單的回聲伺服器。不算太壞,但是我們可以做得更好。
2. 将伺服器移植到POE上
為了把IO::Socket伺服器移植到POE上,需要使用到某些POE的底層特征。
為了詳細說明的需要,我們竟可能地不省略細節,而最終的程式也将保留其中的大部分代碼。
事實上,以上的IO::Socket伺服器本身就是由事件驅動的,在其中包含了一個用于檢測并分派之間的主循環,配以處理這些事件的相應事件句柄。
從這一點上來說,與POE的原理和架構有異曲同工的意思。
新的伺服器程式需要一個如下所示的POE空架構。用于具體功能實作的代碼将被添加到這個架構之中。
#!/usr/bin/perl
use warnings;
use strict;
use POSIX;
use IO::Socket;
use POE;
POE::Session->create
( inline_states =>
{
}
);
POE::Kernel->run();
exit;
在繼續完成接下來的程式之前,為了勾勒出程式的大緻架構結構,必須明确哪些事件的出現是必要的。
1) 伺服器啟動,完成初始化。
2) 伺服器socket準備就緒,可以接收連接配接。
3) 客戶socket處于可讀取狀态,伺服器讀取資料并對其進行處理。
4) 客戶socket處于可寫狀态,伺服器對其寫入一些資料。
5) 客戶socket發生錯誤,需要将其關閉。
一旦知道了需要做些什麼,就可以建立這些事件的名稱,并為這些事件編寫相應的事件處理句柄,進而快速地完成POE::Session的構造。
POE::Session->create
( inline_states =>
{ _start => \&server_start,
event_accept => \&server_accept,
event_read => \&client_read,
event_write => \&client_write,
event_error => \&client_error,
}
);
現在是真正将IO::Select代碼移植過來的時候了!
和IO::Select伺服器相同,需要為客戶socket提供輸入和輸出緩沖,
而且由于這兩個緩沖對socket句柄的重要性并且不存在沖突,它們将被保持為程式的全局變量。
另外,在這裡将不再使用%ready哈希表。
my %inbuffer = ();
my %outbuffer = ();
緊接着是引入IO::Select的程式片段,因為每一段都是上面指定的事件所觸發的,是以這些片段将被移植入相應事件的處理句柄中。
在_start事件的處理句柄中,需要建立一個伺服器的監聽socket,并用select_read為其配置設定一個事件發生器。
句柄中用到的POE::Kernel子產品中的select_read方法接收兩個參數:
第一個是需要監視的socket,
第二個是當該socket處于可讀狀态時所觸發的處理句柄。
sub server_start {
my $server = IO::Socket::INET->new
( LocalPort => 12345,
Listen => 10,
Reuse => "yes",
) or die "can't make server socket: $@\n";
$_[KERNEL]->select_read( $server, "event_accept" );
}
注意一點,我們并沒有儲存伺服器socket。
因為POE::Kernel會對其進行跟蹤,并将其作為一個參數傳遞給event_accept事件句柄。
隻有在需要特殊用途的情況下,我們才會儲存一個該socket的拷貝。
再回顧POE::Session構造器,事件event_accept會激發server_accept事件句柄。
該句柄接收一個新的客戶socket,并對其配置設定一個螢幕。
sub server_accept {
my ( $kernel, $server ) = @_[ KERNEL, ARG0 ];
my $new_client = $server->accept();
$kernel->select_read( $new_client, "event_read" );
}
之後我們在client_read句柄中處理處理來自客戶的資料。
當新連接配接的客戶socket處于可讀狀态時,句柄被觸發。
該句柄中的第一個客戶參數即為所連接配接的客戶socket,是以我們無需再為其保留一個拷貝。
在内容上,句柄client_read與IO::Select伺服器中的handle_read幾乎一樣。
而由于handle_read的accept部分的内容被移植到了server_accept句柄中,相應地就不再需要%ready哈希表了。
如果接收過程發生錯誤,客戶socket通過POE::Kernel的yield方法将被傳送到event_error句柄中。
因為該yield方法是根據程式員的具體要求發送事件的,是以需要在yield中将發生錯誤的客戶socket作為某個參數,
而此socket在處理該event_error的事件句柄client_error中被指派給$_[ARG0]。
接着,如果在client_read中發現輸出緩存存在資料,
我們将檢測以保證當該客戶socket處于可寫狀态時,及時觸發事件處理句柄,将資料發送出去。
sub client_read {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
my $data = "";
my $rv = $client->recv( $data, POSIX::BUFSIZ, 0 );
unless ( defined($rv) and length($data) ) {
$kernel->yield( event_error => $client );
return;
}
$inbuffer{$client} .= $data;
while ( $inbuffer{$client} =~ s/(.*\n)// ) {
$outbuffer{$client} .= $1;
}
if ( exists $outbuffer{$client} ) {
$kernel->select_write( $client, "event_write" );
}
}
在用于發送資料的事件句柄中,第一個客戶參數依然是一個可用的socket。
在該句柄中,如果輸出緩存為空,則停止檢測并迅速傳回。
否則,我們将試圖将緩沖内的資料全部發出。如果所有資料均發送成功,該緩沖将被銷毀。
與client_read類似,client_write中也有相應的錯誤處理句柄。
sub client_write {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
unless ( exists $outbuffer{$client} ) {
$kernel->select_write($client);
return;
}
my $rv = $client->send( $outbuffer{$client}, 0 );
unless ( defined $rv ) {
warn "I was told I could write, but I can't.\n";
return;
}
if ( $rv == length( $outbuffer{$client} ) or
$! == POSIX::EWOULDBLOCK
) {
substr( $outbuffer{$client}, 0, $rv ) = "";
delete $outbuffer{$client} unless length $outbuffer{$client};
return;
}
$kernel->yield( event_error => $client );
}
最後說明一下在以上兩個句柄中被用到的錯誤處理句柄。
我們首先删除了客戶socket的輸入輸出緩存,再關閉建立在該socket上的所有監視,最後保證該socket被成功關閉。
如此這般便有效地關閉了來自于客戶的連接配接。
sub client_error {
my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];
delete $inbuffer{$client};
delete $outbuffer{$client};
$kernel->select($client);
close $client;
}
移植成功!
原文連結: http://www.cnitblog.com/gyn/archive/2008/05/09/43537.html