天天看點

Swoole Task 的應用

目錄

  • 概述
  • 代碼
    • server.php
    • client.php
  • 小結
  • 擴充
  • 參考文檔

這是關于 Swoole 學習的第二篇文章:Swoole Task 的應用。

  • 第一篇:Swoole Timer 的應用

Swoole 異步Task,主要實作調用異步任務的執行。

常用的場景:異步支付處理、異步訂單處理、異步日志處理、異步發送郵件/短信等。

Swoole 的實作方式是 worker 程序處理資料請求,配置設定給 task 程序執行。

官方介紹:

task 底層使用Unix Socket管道通信,是全記憶體的,沒有IO消耗。單程序讀寫性能可達100萬/s,不同的程序使用不同的管道通信,可以最大化利用多核。

本地版本:PHP 7.2.6、Swoole 4.3.1。

不多說,先看效果圖:

Swoole Task 的應用

<?php

class Server
{
    private $serv;

    public function __construct() {
        $this->serv = new swoole_server('0.0.0.0', 9501);
        $this->serv->set([
            'worker_num'      => 2, //開啟2個worker程序
            'max_request'     => 4, //每個worker程序 max_request設定為4次
            'task_worker_num' => 4, //開啟4個task程序
            'dispatch_mode'   => 2, //資料包分發政策 - 固定模式
        ]);

        $this->serv->on('Start', [$this, 'onStart']);
        $this->serv->on('Connect', [$this, 'onConnect']);
        $this->serv->on("Receive", [$this, 'onReceive']);
        $this->serv->on("Close", [$this, 'onClose']);
        $this->serv->on("Task", [$this, 'onTask']);
        $this->serv->on("Finish", [$this, 'onFinish']);

        $this->serv->start();
    }

    public function onStart($serv) {
        echo "#### onStart ####".PHP_EOL;
        echo "SWOOLE ".SWOOLE_VERSION . " 服務已啟動".PHP_EOL;
        echo "master_pid: {$serv->master_pid}".PHP_EOL;
        echo "manager_pid: {$serv->manager_pid}".PHP_EOL;
        echo "########".PHP_EOL.PHP_EOL;
    }

    public function onConnect($serv, $fd) {
        echo "#### onConnect ####".PHP_EOL;
        echo "用戶端:".$fd." 已連接配接".PHP_EOL;
        echo "########".PHP_EOL.PHP_EOL;
    }

    public function onReceive($serv, $fd, $from_id, $data) {
        echo "#### onReceive ####".PHP_EOL;
        echo "worker_pid: {$serv->worker_pid}".PHP_EOL;
        echo "用戶端:{$fd} 發來的Email:{$data}".PHP_EOL;
        $param = [
            'fd'    => $fd,
            'email' => $data
        ];
        $rs = $serv->task(json_encode($param));
        if ($rs === false) {
            echo "任務配置設定失敗 Task ".$rs.PHP_EOL;
        } else {
            echo "任務配置設定成功 Task ".$rs.PHP_EOL;
        }
        echo "########".PHP_EOL.PHP_EOL;
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "#### onTask ####".PHP_EOL;
        echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;

        //業務代碼
        for($i = 1 ; $i <= 5 ; $i ++ ) {
            sleep(2);
            echo "Task {$task_id} 已完成了 {$i}/5 的任務".PHP_EOL;
        }

        $data_arr = json_decode($data, true);
        $serv->send($data_arr['fd'] , 'Email:'.$data_arr['email'].',發送成功');
        $serv->finish($data);
        echo "########".PHP_EOL.PHP_EOL;
    }

    public function onFinish($serv,$task_id, $data) {
        echo "#### onFinish ####".PHP_EOL;
        echo "Task {$task_id} 已完成".PHP_EOL;
        echo "########".PHP_EOL.PHP_EOL;
    }

    public function onClose($serv, $fd) {
        echo "Client Close.".PHP_EOL;
    }
}

$server = new Server();
           

<?php

class Client
{
    private $client;

    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

        $this->client->on('Connect', [$this, 'onConnect']);
        $this->client->on('Receive', [$this, 'onReceive']);
        $this->client->on('Close', [$this, 'onClose']);
        $this->client->on('Error', [$this, 'onError']);
    }

    public function connect() {
        if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
            echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
            return;
        }
    }

    public function onConnect($cli) {
        fwrite(STDOUT, "輸入Email:");
        swoole_event_add(STDIN, function() {
            fwrite(STDOUT, "輸入Email:");
            $msg = trim(fgets(STDIN));
            $this->send($msg);
        });
    }

    public function onReceive($cli, $data) {
        echo PHP_EOL."Received: ".$data.PHP_EOL;
    }

    public function send($data) {
        $this->client->send($data);
    }

    public function onClose($cli) {
        echo "Client close connection".PHP_EOL;
    }

    public function onError() {

    }
}

$client = new Client();
$client->connect();
           

一、上面的配置總共開啟了幾個程序?

總共8個程序(1個master程序、1個manager程序、4個task程序、2個worker程序)

重新運作的可能與上圖程序号不一緻:

Swoole Task 的應用

master程序:22481

manager程序:22485

task程序:22488、22489、22490、22491

worker程序:22492、22493

參考官方提供的程序圖:

Swoole Task 的應用

二、為什麼執行了5次後,worker程序号發生了改變?

因為我們設了置worker程序的max_request=4,一個worker程序在完成最大請求次數任務後将自動退出,程序退出會釋放所有的記憶體和資源,這樣的機制主要是解決PHP程序記憶體溢出的問題。

三、當task執行任務異常,我們kill一個task程序,會再新增一個嗎?

會。

四、如何設定 task_worker_num ?

最大值不得超過 SWOOLE_CPU_NUM * 1000。

檢視本機 CPU 核數:

echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;
           

根據項目的任務量決定的,比如:1秒會産生200個任務,執行每個任務需要500ms。

想在1s中執行完成200個任務,需要100個task程序。

100 = 200/(1/0.5)

五、如何設定 worker_num ?

預設設定為本機的CPU核數,最大不得超過 SWOOLE_CPU_NUM * 1000。

比如:1個請求耗時10ms,要提供1000QPS的處理能力,那就必須配置10個程序。

10 = 0.01*1000

假設每個程序占用40M記憶體,10個程序就需要占用400M的記憶體。

  • Server->taskwait
  • Server->taskWaitMulti
  • Server->taskCo

  • https://wiki.swoole.com/wiki/page/134.html

本文歡迎轉發,轉發請注明作者和出處,謝謝!

Swoole Task 的應用

作者:新亮筆記(關注公衆号,可申請添加微信好友)

出處:https://www.cnblogs.com/xinliangcoder

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。