天天看点

POE::Wheel::Run示例详解

一、POE简介

perl的POE模块是一个事件驱动的有限状态机编程框架,

它用一个单进程模拟多线程的多任务执行模块,

并提供的非阻塞的IO操作(就像C语言下面的libevent库)。

它能干的事情很多,并且写起来代码很清晰,可读性非常好。

POE最重要的一点是,它尝试着把在事件驱动环境中编程的恼人细节给隐藏起来。

POE的组成部分包括:

states(状态),kernel(内核),session(任务),driver(驱动),filter,wheel和component等。

states: 就是有限状态机中的状态,

        对应着的就是处理这些状态的句柄--也就是函数。

kernel: 是POE的核心模块.

        POE的内核与OS的内核很相似: 它会跟踪后台的所有进程和数据,并调度运行的代码。

        可以用内核来设置POE进程的时钟,将要运行的状态进行排队,和执行各种低级服务,

        但在很多时候,并不需要直接和内核进行交互。

session:它等同于OS中的进程。

        session是从一个状态切换到另一个状态时运行的POE程序。

        它可以创建子session, 发送POE事件给其它session等。

        每个session都有一个名为head的哈希表,可以用来存储本session的私有数据,

        并且该session中的每个状态处理函数都可访问它。

        POE是一个很精简的多任务协作模型;

        所有的session都是在同个OS进程中执行,其本身上不是多线程或多进程的。

        因此,在POE编程中,尽量不要使用阻塞式的系统调用,否则会导致整个POE运行的阻塞。

driver: 是POE的I/O层的最低层。

        现在,POE的发布版本中只有一个driver -- POE::Driver::SysRW.

        它可以从文件句柄中读写数据.

filter: 它是将格式化的数据块转换成另一种格式的接口。

        例如,POE::Filter::HTTPD将HTTP 1.0的请求转换成HTTP::Request对象并返回;

        POE::Filter::Line将原始数据流转换成多行(就像Perl的<>操作)。

wheel:  它是处理任务的高级逻辑的可重用代码片断。

        它们以POE的方式将有用的代码做了封装。

        在POE中,通常使用wheel做的事情包括处理事件驱动的输入输出,易用的网络连接。

        Wheel通常使用Filter和Driver来通知和发送数据。

component: 它就是一个封装好了的可重用的session,可以被别的session控制。

        其它的session可以发送命令给它,也可以从它那接收事件,很像OS中的使用IPC进行进程通信。

        Component的例子包括,

        POE::Component::IRC, 用来创建基于POE的IRC客户端的接口;

        POE::Component::HTTP, Perl中由事件驱动的用户代码。

二、POE::Wheel简介

POE::Wheel是封装好了的用来执行特定任务的事件句柄包,

它负责管理激活对应事件处理句柄的事件监测器。

在POE::Wheel创建时,它会添加匿名的事件句柄给调用它的session。

因此,当session创建wheel时,就需要处理这些新的事件。

常用的Wheel有如下:

POE::Wheel::Curses        - Non-blocking input for Curses.

POE::Wheel::FollowTail    - Non-blocking file and FIFO monitoring.

POE::Wheel::ListenAccept  - Non-blocking server for existing sockets.

POE::Wheel::ReadLine      - Non-blocking console input, with full readline support.

POE::Wheel::ReadWrite     - Non-blocking stream I/O.

POE::Wheel::Run           - Non-blocking process creation and management.

POE::Wheel::SocketFactory - Non-blocking socket creation, supporting most protocols and modes.

三、POE::Wheel::Run简介

是一个非阻塞的进程创建和管理器。

因为POE本身不是多线程或多进程的,所以要创建子进程以并行处理时就要用到它啦。

父进程和子进程可以通过子进程的文件句柄STDIN, STDOUT和STDERR进行通信。

在父进程中,用POE::Wheel::Run对象表示子进程,

可以使用这个对象的PID()和kill()方法来查询和管理子进程。

父进程使用put()方法发送数据到子进程STDIN,

子进程的STDOUT和STDERR将会以事件方式发送给父进程。

POE::Wheel::Run对象同样可以在子进程关闭其输出文件句柄时通知父进程。

但父进程监视子进程退出更可靠的方式是使用:

 POE::Kernel的sig_child();

它会等待wheel的子进程退出并回收。

而且最好在所有的环境下都要用sig_child(),

不然,POE不会回收子进程。从而导致进程的泄漏,对于长时运行的程序来说,这是致命的。

对于这种情况,POE::Kernel在退出时会有warning提示。

POE::Wheel::Run的对象和子进程的通信,默认地是基于行的。

在程序开发时,可以通过使用POE::Filter对象来覆盖

"StdinFilter","StdoutFilter","StdioFilter"和"StderrFilter"事件句柄,

实现更多样的数据处理方式。

四、示例程序

本例是一很简单的示例程序,主要是用来说明整个开发流程。

程序实现的是用子进程打开文件,读取三行,

并在父进程中显示出这三行的内容。

闲话少说,正式开始程序:

1. 主程序

1  #!/usr/bin/perl -w

2

3  ###############################################################################

4  # \File

5  #   test_poe_wheel_run.pl

6  # \Descript

7  #   test POE::Wheel::Run

8  # \Author

9  #   Hank

10 # \Created date

11 #   2013-4-25

12 ###############################################################################

13 use strict;

14 use POE qw(Wheel::Run

15     Filter::Reference);

16

17 my $filename = "test_file.ini";

18 my $line_num = 3;

19

20 our $task_pid = 1;

21 $SIG{TERM} = $SIG{INT} = sub {

22 kill KILL => $task_pid;

23 exit;

24 };

25

26 #

27 # Main process

28 #

29 POE::Session->create(

30   inline_states => {

31     _start      => \&start_task,

32     _stop       => \&handle_task_shutdown,

33     task_result => \&handle_task_result,

34     task_done   => \&handle_task_done,

35     task_debug  => \&handle_task_debug,

36     sig_child   => \&handle_sig_child,

37   },

38   args => [$filename, $line_num],

39 );

40

41 $poe_kernel->run();

42

43 exit;

Line14,15:

  导入要用的POE模块,qw()声明是Perl用来一次性加载多个POE模块的简写方式,

  它和下面的代码等同:

    use POE;

    use POE::Wheel::Run;

    use POE::Filter::Reference;

Line17,18:

  定义了要读取的文件和行数,这么写是为了示例如何进行参数传递。

Line20~24:

  这段代码在POE::Wheel::Run开发中很有必要,它的作用是在整个POE进程用

  Ctrl+c或退出时kill掉子进程。

  否则会造成子进程失去父亲,被系统的init接管。

Line29:

  POE的session创建函数。

Line30~37:

  向POE注册本程序要处理的状态,以及这些状态对应的状态处理句柄。

  以"_"开始的"_start","_stop"是POE::Kernel默认的状态,分别用于session的启动和销毁。

  sig_child注册的POE::Kernel对子进程退出后,对其回收的状态处理句柄。

  其它三个状态都是自定义状态及其处理句柄。

Line38:

  是将参数传递给session的启动状态对应的处理函数start_task().

  POE使用了一个传递参数的特别方式:

  它将数组@_封装了很多额外的参数 -- 分别是:

     当前内核, session的引用, 状态名, Heap的引用,以及ARG0 ~ ARG9;

  要访问它们,可以使用数组@_加下标KERNEL, SESSION, STATE, HEAP, ARG0 ~ ARG9。

  这样的设计是为了最大化的提高运行速度。

  所以,在POE中的参数或数据在状态处理函数间的传递有两种方式:

    HEAP哈希,或ARG0~ARG9.

  Line38用的就是后一种,[$filename, $line_num]对应赋值到ARG0和ARG1。

Line41~43:

  启动POE::Kernel,由此便建立了一个用来探测并分派事件的主循环。整个程序就运转起来了。

  且run方法只有在所有session返回之后才会停止循环。

  之后,我们调用一个表示程序结束的提示符的exit系统方法来表示程序被终止。

  POE轮次处理每一个事件,每次只有一个事件句柄被运行。

  当事件句柄运行的时候,POE::Kernel自身也将被中断,在事件句柄返回之前,没有事件被分派。

  当各个session的事件被传送到主程序事件队列后,位于队列头部的事件被首先处理,

  新来的事件将被放置在队列的尾部。以此保证队列的轮次处理。

  POE::Kernek的run方法在最后一个session停止之后返回。

整个主程序的执行顺序是如下:

先创建session,

session创建完成后就发送第一个事件_start;

之后启动POE的kernel。

此时事件队列中已有一个_start事件,所以进入_start事件对应的状态处理函数start_task()。

2. _start状态处理函数

44 #

45 # sub-functions

46 #

47 sub start_task {

48   my ($kernel, $heap) = @_[KERNEL, HEAP];

49   my ($fname, $lnum)  = @_[ARG0..ARG1];

50   my @params = ($fname, $lnum); 

51

52   my $task = POE::Wheel::Run->new(

53     Program      => sub { task_stuff(@params) },

54     StdoutEvent  => "task_result",

55     StderrEvent  => "task_debug",

56     CloseEvent   => "task_done",

57   );

58

59   $kernel->sig_child($task->PID, "sig_child");

60   $task_pid = $task->PID;

61

62   # Wheel events include the wheel's ID.

63   $heap->{children_by_wid}{$task->ID} = $task;

64

65   # Signal events include the process ID.

66   $heap->{children_by_pid}{$task->PID} = $task;

67

68   print("Child pid ", $task->PID," started as wheel ", $task->ID, ".\n");

69 }

Line49:

  取得Line38经ARG0,ARG1传递的参数。

Line50:

  将变量打包到数组,以传给子进程。

Line52~57:

  创建POE::Wheel::Run对象,

  并将要处理的事件映射到session的state,以事件发生时触发相应的状态处理函数。

Line53:

  是指定子进程将要执行程序,子进程是以exec()方式运行.

  If Program holds a scalar, its value will be executed as exec($program). 

     Shell metacharacters are significant, per exec(SCALAR) semantics.

  If Program holds an array reference, it will executed as exec(@$program). 

     As per exec(ARRAY), shell metacharacters will not be significant.

  If Program holds a code reference, that code will be called in the child process  

Line59:

  指定进程退出时用来回收的事件。

Line62~66:

  将子进程的进程号和Wheel的ID以哈希的方式存储在HEAP中,以用于本session的其它状态处理函数。

3. _stop状态处理函数

71 sub handle_task_shutdown {

72   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];

73   my $task_id = $_[ARG0];

74

75   ## delete all wheels.

76   delete $heap->{wheel};

77

78   ## clear your alias

79   #$kernel->alias_remove($heap->{alias});

80

81   ## clear all alarms you might have set

82   #$kernel->alarm_remove_all();

83

84   return;

85 }

如代码所示,在session退出时进行各种资源的回收。

4. 子进程STDOUT,STDERR及进程结束事件触发的状态处理函数

87 sub handle_task_result {

88   my ($stdout_line, $wheel_id) = @_[ARG0, ARG1];

89   my $child = $_[HEAP]{children_by_wid}{$wheel_id};

90

91   print "pid ", $child->PID, " STDOUT: $stdout_line\n";

92 }

93

94 sub handle_task_debug {

95   my $result = $_[ARG0];

96 }

97

98 sub handle_task_done {

99   my ($kernel, $heap, $task_id) = @_[KERNEL, HEAP, ARG0];

100  delete $heap->{task}->{$task_id};

101}

Line87~92:

  StdoutEvent由子进程的打印到STDOUT触发。

  它带有两个参数:

    ARG0: 子进程写到STDOUT的信息;

    ARG1: 读取这个输出的Wheel的ID号;

Line94~96:

  StderrEvent和StdoutEvent的处理方式一样,在子进程写STDERR时触发。

  它带有两个参数:

    ARG0: 子进程写到STDERR的信息;

    ARG1: 读取这个输出的Wheel的ID号;

Line98~101:

  CloseEvent在子进程关闭它最后一次打开的文件句柄时触发,

  但它不是子进程结束时的信号。对于就种情况使用sig_child()来处理。

  在CloseEvent发生之后,就不可能再触发ErrorEvent或StdoutEvent。

  它带有一个参数:

    ARG0: Wheel的ID号,可以用来在一个session管理多个子进程时分开处理。

5. 子进程结束时session对子进程资源的回收

103sub handle_sig_child {

104  my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];

105  my $child = delete $heap->{children_by_pid}{$pid};

106

107  return unless defined $child;

108  delete $heap->{children_by_wid}{$child->ID};

109  print "PID $$: Child-pid $pid exited\n";

110}

sig_child()是在特定子进程PROCESS_ID退出后,触发相应事件以进行处理的方便方式。

它带有多个参数:

my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];

一个session可以注册多个sig_child()句柄,但是每个子进程对应的只有一个。

且它不会返回任何有意义的值。

6. 子进程任务执行程序体

112#

113# task instance

114#

115sub task_stuff {

116  my ($file, $line) = @_[0..1];

117

118  if ( !open(TASKFILE, "$file")){

119    print STDERR "Cann't open the file $file\n";

120    exit;

121  }

122

123  my $index = 0;

124  foreach my $task (<TASKFILE>){

125    print "$task";

126    $index++;

127    last if ($index == $line);

128  }

129  close (TASKFILE);

130}

这部分就和普通编程别无二致了:

获取参数,打开文件,读取三行,关闭文件。

只是Line119,line125不会在子进程中直接打印输出,

它们分别触发StdoutEvent 和StderrEvent事件,

从而调用相应的状态处理函数handle_task_result()和handle_task_debug()。

更多的事件和状态处理函数的解析可以看官方的文档:

https://metacpan.org/module/POE::Wheel

https://metacpan.org/module/POE::Kernel

《象》曰:天行健,君子以自强不息。

【白话】《象辞》说:天道运行周而复始,永无止息,谁也不能阻挡,君子应效法天道,自立自强,不停地奋斗下去。