天天看点

【Swoole】多进程process多进程初试管道通信的例子消息队列的例子

多进程初试

用Swoole内置的函数写多进程是一件非常苦逼的事情啊,进程间的通信,异步回调(callback)做的都不是很好。还好有Swoole,拯救了PHP在这方面的薄弱,真是让PHP如虎添翼啊。

Swoole,适合中高级程序员,不适合初级程序员,编写者也是冲着这么一个思想来的,他们认为你已经有了调试和看懂原理的能力了,对于我这种小白来说,学习还是非常吃力的。

直接来代码吧

<?php
/**
* swoole_process只能用在cli模式中
* new swoole_process()
* 参数1 mixed $funtion 子进程创建成功后执行的函数
* 参数2 $redirect stdin stdout 重定向子进程的标准输入和标准输出。启用此项后,在进程内echo将不是打印到屏幕,而是写入到管道,读取键盘输入将会变成从管道读取.默认为阻塞读取.
* $create_pipe 是否创建管道.启用
* $redirect_stdin_stdout后,此选项将忽略用户参数,强制为true,如果子进程内没有进程间通信,可以设置为false.
*/

$test = "test";

//创建进程后的callback
function doProcess(swoole_process $worker){
    global $test;
    var_dump($test);    
    var_dump($worker);
    echo "PID ".$worker->pid.PHP_EOL;
    sleep();
}

//通过匿名函数创建进程
$process = new swoole_process(function(swoole_process $worker) use($test){
    echo "我是匿名函数创建的".PHP_EOL;
    var_dump($test);

});
$pid = $process->start();//start方法会返回创建的子进程的pid
//echo "pid :".$pid.PHP_EOL;

//传入callback创建进程
$process = new swoole_process("doProcess");
$pid = $process->start();//start方法会返回创建的子进程的pid
//等待结束 不等待结束的话,自己也会结束,但是容易出现僵尸进程. 这里会阻塞,直到所有进程执行完毕
swoole_process::wait();//通过设置参数true或false可以设置是否阻塞等待,默认为阻塞
//等创建的俩进程全结束了才会继续往下执行,上面创建的两个进程是异步的,即可以认为是同时进行的
echo "结束了".PHP_EOL;
           

运行结果为

我是匿名函数创建的
string() "test"
string() "test"
object(swoole_process)#3 (6) {//worker里面存了管道,callback,消息队列,pid等内容
  ["pipe"]=>
  int()
  ["callback"]=>
  string() "doProcess"
  ["msgQueueId"]=>
  NULL
  ["msgQueueKey"]=>
  NULL
  ["pid"]=>
  int()
  ["id"]=>
  NULL
}
PID 
结束了
           

管道通信的例子

模拟多进程,请求多个URL的地址。此例子可以扩展为异步多进程读取多个文件,上传多个文件等。

<?php
/**
* swoole_event_add()
* 参数1:int $sock
* int 文件描述符
* mixed $read callback 就是stream_socket_client/fsockopen创建的资源
* sockets资源,就是sockets扩展中socket_create创建的资源,需要在编译时加入./configure --enable-sockets
* 参数2: 可读回调函数
*/
echo "startTime: ".date("H:i:s").PHP_EOL;
$workers = [];//进程池
$worker_num = ;//创建进程的数量
$url = [ 
    "www.baidu.com",
    "www.360.cn",
    "blog.diligentyang.com",
];
$result = []; 

for($i=; $i<$worker_num; $i++){
    $process = new swoole_process('doProcess');//创建新进程
    $pid = $process->start();//启动进程,返回pid
    $process->write($i);//向管道内写入当前i的值
    $workers[$pid] = $process;//放入进程池中
}

//创建进程执行函数
function doProcess(swoole_process $worker){
    global $url;//获取全局的URL,因为是共享内存的
    $i = $worker->read();//读取i值
    echo $url[$i].PHP_EOL;
    $res = curlTest($url[$i], (-$i*));//执行curl操作   
    $worker->write("pid :". $worker->pid.' res: '.$res.PHP_EOL);//往管道里面写数据 pipe 将curl的结果再传回主进程
    echo "写入信息: pid: ".$worker->pid.' res: '.$res.' './*$worker->callback.*/PHP_EOL;
    //sleep(5);
    //$worker->exit(0); 退出子进程
}
//echo "sleep 5".PHP_EOL;
//添加进程事件,像每个子进程添加需要执行的动作
foreach($workers as $process){
    //添加
    //swoole_event_add($process->pipe, function($pipe) use($process){//加到事件中变为异步模式
    //  $data = $process->read();//读取管道数据
    //  echo "接收到: ".$data.PHP_EOL;
    //  $result[] = $data;
    //});

    $data = $process->read();//读取管道数据  这里是同步阻塞读取
    echo "接收到: ".$data.PHP_EOL;
    $result[] = $data;
}

function curlTest($url, $stime){//假装执行curl操作
    sleep($stime);//sleep一定时间,此处为10s,8s,6s
    return "handle ". $url ." finished";
}

for($i = ; $i < $worker_num; $i++)
{
    $ret = swoole_process::wait();
    $pid = $ret['pid'];
    unset($workers[$pid]);
    echo "Worker Exit, PID=".$pid.PHP_EOL;
}

var_dump($result);

echo "结束".PHP_EOL;
echo "startTime: ".date("H:i:s").PHP_EOL;

           

执行结果为

startTime: ::
www.baidu.com
www.360.cn
blog.diligentyang.com
写入信息: pid:  res: handle blog.diligentyang.com finished 
写入信息: pid:  res: handle www.360.cn finished 
写入信息: pid:  res: handle www.baidu.com finished 
接收到: pid : res: handle www.baidu.com finished

接收到: pid : res: handle www.360.cn finished

接收到: pid : res: handle blog.diligentyang.com finished

Worker Exit, PID=23961
Worker Exit, PID=23962
Worker Exit, PID=23960
array(3) {
  [0]=>
  string(46) "pid : res: handle www.baidu.com finished
"
  [1]=>
  string(43) "pid : res: handle www.360.cn finished
"
  [2]=>
  string(54) "pid : res: handle blog.diligentyang.com finished
"
}
结束
startTime: ::
           

可以看出,多进程模式还是省了大量时间的,实际上这几个要是顺序阻塞运行,会消耗24s+的时间,而多进程只需要耗时最长的那一个时间,即10s就可以完成。

消息队列的例子

<?php
<?php
$workers = [];//进程仓库
$worker_num = ;//最大进程数

for($i=; $i<$worker_num; $i++){
    //第三个参数改为false,才能实现进程通讯
    $process = new swoole_process('doProcess', false, false);//创建子进程
    $process->useQueue();//开启队列,类似于全局函数
    $pid = $process->start();
    $workers[$pid] = $process;
}

//进程执行函数
function doProcess(swoole_process $worker){
    $recv = $worker->pop();//默认是8192个长度
    echo "从主进程获取到的数据: ". $recv."---true pid".$worker->pid.PHP_EOL;
    sleep();
    $worker->exit();
}

//主进程 向子进程添加
foreach($workers as $pid => $process){
    $process->push("hello 子进程 $pid");
}

//等待子进程结束回收资源
for($i=; $i<$worker_num; $i++){
    $ret = swoole_process::wait();//等待执行完成
    $pid = $ret['pid'];
    unset($workers[$pid]);
    echo "子进程退出 $pid".PHP_EOL;
}

echo "this is the end".PHP_EOL;
           

运行结果为

从主进程获取到的数据: hello 子进程 ---true pid24288
从主进程获取到的数据: hello 子进程 ---true pid24287
子进程退出 
子进程退出 
this is the end
           

可以看出两个进程pop的顺序还是未知的。

如果有不懂的地方,可以参考官网文档,结合例子。