天天看點

POE::Wheel::Run示例詳解

一、POE簡介

perl的POE子產品是一個事件驅動的有限狀态機程式設計架構,

它用一個單程序模拟多線程的多任務執行子產品,

并提供的非阻塞的IO操作(就像C語言下面的libevent庫)。

它能幹的事情很多,并且寫起來代碼很清晰,可讀性非常好。

POE最重要的一點是,它嘗試着把在事件驅動環境中程式設計的惱人細節給隐藏起來。

POE的組成部分包括:

states(狀态),kernel(核心),session(任務),driver(驅動),filter,wheel和component等。

states: 就是有限狀态機中的狀态,

        對應着的就是處理這些狀态的句柄--也就是函數。

kernel: 是POE的核心子產品.

        POE的核心與OS的核心很相似: 它會跟蹤背景的所有程序和資料,并排程運作的代碼。

        可以用核心來設定POE程序的時鐘,将要運作的狀态進行排隊,和執行各種低級服務,

        但在很多時候,并不需要直接和核心進行互動。

session:它等同于OS中的程序。

        session是從一個狀态切換到另一個狀态時運作的POE程式。

        它可以建立子session, 發送POE事件給其它session等。

        每個session都有一個名為head的哈希表,可以用來存儲本session的私有資料,

        并且該session中的每個狀态處理函數都可通路它。

        POE是一個很精簡的多任務協作模型;

        所有的session都是在同個OS程序中執行,其本身上不是多線程或多程序的。

        是以,在POE程式設計中,盡量不要使用阻塞式的系統調用,否則會導緻整個POE運作的阻塞。

driver: 是POE的I/O層的最低層。

        現在,POE的釋出版本中隻有一個driver -- POE::Driver::SysRW.

        它可以從檔案句柄中讀寫資料.

filter: 它是将格式化的資料塊轉換成另一種格式的接口。

        例如,POE::Filter::HTTPD将HTTP 1.0的請求轉換成HTTP::Request對象并傳回;

        POE::Filter::Line将原始資料流轉換成多行(就像Perl的<>操作)。

wheel:  它是處理任務的進階邏輯的可重用代碼片斷。

        它們以POE的方式将有用的代碼做了封裝。

        在POE中,通常使用wheel做的事情包括處理事件驅動的輸入輸出,易用的網絡連接配接。

        Wheel通常使用Filter和Driver來通知和發送資料。

component: 它就是一個封裝好了的可重用的session,可以被别的session控制。

        其它的session可以發送指令給它,也可以從它那接收事件,很像OS中的使用IPC進行程序通信。

        Component的例子包括,

        POE::Component::IRC, 用來建立基于POE的IRC用戶端的接口;

        POE::Component::HTTP, Perl中由事件驅動的使用者代碼。

二、POE::Wheel簡介

POE::Wheel是封裝好了的用來執行特定任務的事件句柄包,

它負責管理激活對應事件處理句柄的事件監測器。

在POE::Wheel建立時,它會添加匿名的事件句柄給調用它的session。

是以,當session建立wheel時,就需要處理這些新的事件。

常用的Wheel有如下:

POE::Wheel::Curses        - Non-blocking input for Curses.

POE::Wheel::FollowTail    - Non-blocking file and FIFO monitoring.

POE::Wheel::ListenAccept  - Non-blocking server for existing sockets.

POE::Wheel::ReadLine      - Non-blocking console input, with full readline support.

POE::Wheel::ReadWrite     - Non-blocking stream I/O.

POE::Wheel::Run           - Non-blocking process creation and management.

POE::Wheel::SocketFactory - Non-blocking socket creation, supporting most protocols and modes.

三、POE::Wheel::Run簡介

是一個非阻塞的程序建立和管理器。

因為POE本身不是多線程或多程序的,是以要建立子程序以并行處理時就要用到它啦。

父程序和子程序可以通過子程序的檔案句柄STDIN, STDOUT和STDERR進行通信。

在父程序中,用POE::Wheel::Run對象表示子程序,

可以使用這個對象的PID()和kill()方法來查詢和管理子程序。

父程序使用put()方法發送資料到子程序STDIN,

子程序的STDOUT和STDERR将會以事件方式發送給父程序。

POE::Wheel::Run對象同樣可以在子程序關閉其輸出檔案句柄時通知父程序。

但父程序監視子程序退出更可靠的方式是使用:

 POE::Kernel的sig_child();

它會等待wheel的子程序退出并回收。

而且最好在所有的環境下都要用sig_child(),

不然,POE不會回收子程序。進而導緻程序的洩漏,對于長時運作的程式來說,這是緻命的。

對于這種情況,POE::Kernel在退出時會有warning提示。

POE::Wheel::Run的對象和子程序的通信,預設地是基于行的。

在程式開發時,可以通過使用POE::Filter對象來覆寫

"StdinFilter","StdoutFilter","StdioFilter"和"StderrFilter"事件句柄,

實作更多樣的資料處理方式。

四、示例程式

本例是一很簡單的示例程式,主要是用來說明整個開發流程。

程式實作的是用子程序打開檔案,讀取三行,

并在父程序中顯示出這三行的内容。

閑話少說,正式開始程式:

1. 主程式

1  #!/usr/bin/perl -w

2

3  ###############################################################################

4  # \File

5  #   test_poe_wheel_run.pl

6  # \Descript

7  #   test POE::Wheel::Run

8  # \Author

9  #   Hank

10 # \Created date

11 #   2013-4-25

12 ###############################################################################

13 use strict;

14 use POE qw(Wheel::Run

15     Filter::Reference);

16

17 my $filename = "test_file.ini";

18 my $line_num = 3;

19

20 our $task_pid = 1;

21 $SIG{TERM} = $SIG{INT} = sub {

22 kill KILL => $task_pid;

23 exit;

24 };

25

26 #

27 # Main process

28 #

29 POE::Session->create(

30   inline_states => {

31     _start      => \&start_task,

32     _stop       => \&handle_task_shutdown,

33     task_result => \&handle_task_result,

34     task_done   => \&handle_task_done,

35     task_debug  => \&handle_task_debug,

36     sig_child   => \&handle_sig_child,

37   },

38   args => [$filename, $line_num],

39 );

40

41 $poe_kernel->run();

42

43 exit;

Line14,15:

  導入要用的POE子產品,qw()聲明是Perl用來一次性加載多個POE子產品的簡寫方式,

  它和下面的代碼等同:

    use POE;

    use POE::Wheel::Run;

    use POE::Filter::Reference;

Line17,18:

  定義了要讀取的檔案和行數,這麼寫是為了示例如何進行參數傳遞。

Line20~24:

  這段代碼在POE::Wheel::Run開發中很有必要,它的作用是在整個POE程序用

  Ctrl+c或退出時kill掉子程序。

  否則會造成子程序失去父親,被系統的init接管。

Line29:

  POE的session建立函數。

Line30~37:

  向POE注冊本程式要處理的狀态,以及這些狀态對應的狀态處理句柄。

  以"_"開始的"_start","_stop"是POE::Kernel預設的狀态,分别用于session的啟動和銷毀。

  sig_child注冊的POE::Kernel對子程序退出後,對其回收的狀态處理句柄。

  其它三個狀态都是自定義狀态及其處理句柄。

Line38:

  是将參數傳遞給session的啟動狀态對應的處理函數start_task().

  POE使用了一個傳遞參數的特别方式:

  它将數組@_封裝了很多額外的參數 -- 分别是:

     目前核心, session的引用, 狀态名, Heap的引用,以及ARG0 ~ ARG9;

  要通路它們,可以使用數組@_加下标KERNEL, SESSION, STATE, HEAP, ARG0 ~ ARG9。

  這樣的設計是為了最大化的提高運作速度。

  是以,在POE中的參數或資料在狀态處理函數間的傳遞有兩種方式:

    HEAP哈希,或ARG0~ARG9.

  Line38用的就是後一種,[$filename, $line_num]對應指派到ARG0和ARG1。

Line41~43:

  啟動POE::Kernel,由此便建立了一個用來探測并分派事件的主循環。整個程式就運轉起來了。

  且run方法隻有在所有session傳回之後才會停止循環。

  之後,我們調用一個表示程式結束的提示符的exit系統方法來表示程式被終止。

  POE輪次處理每一個事件,每次隻有一個事件句柄被運作。

  當事件句柄運作的時候,POE::Kernel自身也将被中斷,在事件句柄傳回之前,沒有事件被分派。

  當各個session的事件被傳送到主程式事件隊列後,位于隊列頭部的事件被首先處理,

  新來的事件将被放置在隊列的尾部。以此保證隊列的輪次處理。

  POE::Kernek的run方法在最後一個session停止之後傳回。

整個主程式的執行順序是如下:

先建立session,

session建立完成後就發送第一個事件_start;

之後啟動POE的kernel。

此時事件隊列中已有一個_start事件,是以進入_start事件對應的狀态處理函數start_task()。

2. _start狀态處理函數

44 #

45 # sub-functions

46 #

47 sub start_task {

48   my ($kernel, $heap) = @_[KERNEL, HEAP];

49   my ($fname, $lnum)  = @_[ARG0..ARG1];

50   my @params = ($fname, $lnum); 

51

52   my $task = POE::Wheel::Run->new(

53     Program      => sub { task_stuff(@params) },

54     StdoutEvent  => "task_result",

55     StderrEvent  => "task_debug",

56     CloseEvent   => "task_done",

57   );

58

59   $kernel->sig_child($task->PID, "sig_child");

60   $task_pid = $task->PID;

61

62   # Wheel events include the wheel's ID.

63   $heap->{children_by_wid}{$task->ID} = $task;

64

65   # Signal events include the process ID.

66   $heap->{children_by_pid}{$task->PID} = $task;

67

68   print("Child pid ", $task->PID," started as wheel ", $task->ID, ".\n");

69 }

Line49:

  取得Line38經ARG0,ARG1傳遞的參數。

Line50:

  将變量打包到數組,以傳給子程序。

Line52~57:

  建立POE::Wheel::Run對象,

  并将要處理的事件映射到session的state,以事件發生時觸發相應的狀态處理函數。

Line53:

  是指定子程序将要執行程式,子程序是以exec()方式運作.

  If Program holds a scalar, its value will be executed as exec($program). 

     Shell metacharacters are significant, per exec(SCALAR) semantics.

  If Program holds an array reference, it will executed as exec(@$program). 

     As per exec(ARRAY), shell metacharacters will not be significant.

  If Program holds a code reference, that code will be called in the child process  

Line59:

  指定程序退出時用來回收的事件。

Line62~66:

  将子程序的程序号和Wheel的ID以哈希的方式存儲在HEAP中,以用于本session的其它狀态處理函數。

3. _stop狀态處理函數

71 sub handle_task_shutdown {

72   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];

73   my $task_id = $_[ARG0];

74

75   ## delete all wheels.

76   delete $heap->{wheel};

77

78   ## clear your alias

79   #$kernel->alias_remove($heap->{alias});

80

81   ## clear all alarms you might have set

82   #$kernel->alarm_remove_all();

83

84   return;

85 }

如代碼所示,在session退出時進行各種資源的回收。

4. 子程序STDOUT,STDERR及程序結束事件觸發的狀态處理函數

87 sub handle_task_result {

88   my ($stdout_line, $wheel_id) = @_[ARG0, ARG1];

89   my $child = $_[HEAP]{children_by_wid}{$wheel_id};

90

91   print "pid ", $child->PID, " STDOUT: $stdout_line\n";

92 }

93

94 sub handle_task_debug {

95   my $result = $_[ARG0];

96 }

97

98 sub handle_task_done {

99   my ($kernel, $heap, $task_id) = @_[KERNEL, HEAP, ARG0];

100  delete $heap->{task}->{$task_id};

101}

Line87~92:

  StdoutEvent由子程序的列印到STDOUT觸發。

  它帶有兩個參數:

    ARG0: 子程序寫到STDOUT的資訊;

    ARG1: 讀取這個輸出的Wheel的ID号;

Line94~96:

  StderrEvent和StdoutEvent的處理方式一樣,在子程序寫STDERR時觸發。

  它帶有兩個參數:

    ARG0: 子程序寫到STDERR的資訊;

    ARG1: 讀取這個輸出的Wheel的ID号;

Line98~101:

  CloseEvent在子程序關閉它最後一次打開的檔案句柄時觸發,

  但它不是子程序結束時的信号。對于就種情況使用sig_child()來處理。

  在CloseEvent發生之後,就不可能再觸發ErrorEvent或StdoutEvent。

  它帶有一個參數:

    ARG0: Wheel的ID号,可以用來在一個session管理多個子程序時分開處理。

5. 子程序結束時session對子程序資源的回收

103sub handle_sig_child {

104  my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];

105  my $child = delete $heap->{children_by_pid}{$pid};

106

107  return unless defined $child;

108  delete $heap->{children_by_wid}{$child->ID};

109  print "PID $$: Child-pid $pid exited\n";

110}

sig_child()是在特定子程序PROCESS_ID退出後,觸發相應事件以進行處理的友善方式。

它帶有多個參數:

my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];

一個session可以注冊多個sig_child()句柄,但是每個子程序對應的隻有一個。

且它不會傳回任何有意義的值。

6. 子程序任務執行程式體

112#

113# task instance

114#

115sub task_stuff {

116  my ($file, $line) = @_[0..1];

117

118  if ( !open(TASKFILE, "$file")){

119    print STDERR "Cann't open the file $file\n";

120    exit;

121  }

122

123  my $index = 0;

124  foreach my $task (<TASKFILE>){

125    print "$task";

126    $index++;

127    last if ($index == $line);

128  }

129  close (TASKFILE);

130}

這部分就和普通程式設計别無二緻了:

擷取參數,打開檔案,讀取三行,關閉檔案。

隻是Line119,line125不會在子程序中直接列印輸出,

它們分别觸發StdoutEvent 和StderrEvent事件,

進而調用相應的狀态處理函數handle_task_result()和handle_task_debug()。

更多的事件和狀态處理函數的解析可以看官方的文檔:

https://metacpan.org/module/POE::Wheel

https://metacpan.org/module/POE::Kernel

《象》曰:天行健,君子以自強不息。

【白話】《象辭》說:天道運作周而複始,永無止息,誰也不能阻擋,君子應效法天道,自立自強,不停地奮鬥下去。