天天看點

swoole 定時執行php,基于 swoole 下 異步隊列和毫秒定時任務 API

1.在 Server 程式中如果需要執行很耗時的操作,比如一個聊天伺服器發送廣播,Web 伺服器中發送郵件。如果直接去執行這些函數就會阻塞目前程序,導緻伺服器響應變慢。

Swoole 提供了異步任務處理的功能,可以投遞一個異步任務到 TaskWorker 程序池中執行,不影響目前請求的處理速度。(官網說明)

1. 服務端代碼

執行服務端監聽端口9501。通過設定daemonize這個參數,以守護程序在系統去維護這個TaskWorker程序池。我們用戶端将消息傳遞給服務端,服務端異步将資料請求放入程序池隊列運作,從大大縮短了響應時間。

* Created by PhpStorm

* User: pl

* Date: 2020/5/22

* Time: 15:23

*/

class TaskServer

{

private $server;

public function __construct()

{

$this->server = new Swoole\Server('127.0.0.1', 9501);

$this->server->set([

'task_worker_num' => 3, //開啟的程序數 一般為cup核數 1-4倍

'daemonize' => 1, //已守護程序執行該程式

'max_request' => 10000, //worker程序最大任務數

'dispatch_mode' => 2, //設定為争搶模式

'task_ipc_mode' => 3, //設定為消息隊列模式

]);

$this->server->on('Receive', array($this, 'onReceive'));

$this->server->on('Task', array($this, 'onTask'));

$this->server->on('Finish', array($this, 'onFinish'));

$this->server->start();

}

public function onReceive(swoole_server $server, $fd, $form_id, $data)

{

$this->server->task($data);

}

public function onTask($server, $fd, $from_id, $data)

{

$data = json_decode($data, true);

try {

$log_txt = date('Y-m-d H:i:s') . "開始執行任務" . PHP_EOL;

$this->log($log_txt);

$type = $data['data']['type'];

$time = intval($data['data']['timing']);

unset($data['data']['timing']);

unset($data['data']['type']);

if (intval($type) == 1) {

$this->request_curl($data['url'], $data['data'], $data['data']['http_type']);

} else {

Swoole\Timer::after($time, function () use ($data) {

$this->request_curl($data['url'], $data['data'], $data['data']['http_type']);

});

}

} catch (\Exception $exception) {

$log_txt = date('Y-m-d H:i:s') . "執行任務失敗發生錯誤" . PHP_EOL;

$this->log($log_txt);

}

}

public function onFinish($server, $task_id, $data)

{

$log_txt = date('Y-m-d H:i:s') . "$data" . PHP_EOL;

$this->log($log_txt);

}

public function request_curl($url = '', $request_data = '', $request_type = 'get', $headers = [], $is_ssl = false)

{

$ch = curl_init(); //curl初始化

if ($request_type == 'get' && !empty($request_data)) {

$num = 0;

foreach ($request_data as $key => $value) {

if ($num == 0) {

$url .= '?' . $key . '=' . $value;

} else {

$url .= '&' . $key . '=' . $value;

}

$num++;

}

$num = 0;

}

//區分get和post

curl_setopt($ch, CURLOPT_URL, $url); //URL位址

curl_setopt($ch, CURLOPT_HEADER, 0); //頭資訊不輸出

//如果成功隻将結果傳回,不自動輸出任何内容

curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);

//post類型就實作此結果

if ($request_type == 'post') {

//設定為POST方式

curl_setopt($ch, CURLOPT_POST, 1);

//POST資料

curl_setopt($ch, CURLOPT_POSTFIELDS, $request_data);

//當post資料大于1024時強制執行

curl_setopt($ch, CURLOPT_HTTPHEADER, array("Expect:"));

}

//判斷是否繞過證書

if ($is_ssl) {

curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);//繞過ssl驗證

curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);

}

if (!empty($headers)) curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);

$result = curl_exec($ch); //執行

if ($result == FALSE) return false;

curl_close($ch); //關閉資源

return $result;

}

public function log($log_txt)

{

$log = 'log/' . date('Y_m_d') . 'log';

if (!file_exists($log)) {

touch($log);

chown($log, 0777);

}

$file_log = fopen($log, "a");

fputs($file_log, $log_txt);

fclose($file_log);

}

}

$task = new TaskServer();

2.用戶端代碼

class ClientRequest

{

private $client;

private $params; //請求參數

public function __construct($params)

{

$this->client = new swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);

$this->params = $params;

}

public function connect()

{

if (!$this->client->connect('127.0.0.1', 9501, 1)) {

return json_encode([

'code' => 500,

'err_msg' => '連結異步用戶端失敗'

]);

}

$params = $this->params;

$array['url'] = $params['url'];

unset($params['url']);

$array['data'] = $params;

$this->client->send(json_encode($array, JSON_UNESCAPED_UNICODE));

}

}

if($_SERVER['REQUEST_METHOD']!='POST') throw new ErrorException('路由不存在','404');

if(!array_key_exists('HTTP_TOKEN',$_SERVER)){

header('Content-type: application/json');

echo json_encode(['code'=>'200007','token is not empty']);exit();

}

if($_SERVER['HTTP_TOKEN']!= API_KEY ) {

header('Content-type: application/json');

echo json_encode(['code'=>'200007','error token']);exit();

}

$params = $_POST;

$client = new ClientRequest($params);

$client->connect();

開始執行服務端程式,php TaskServer.php我們以接口形式上去調用另外一個耗時接口.簡單對比一下響應速度。

swoole 定時執行php,基于 swoole 下 異步隊列和毫秒定時任務 API

基于 swoole 下 異步隊列 API

swoole 定時執行php,基于 swoole 下 異步隊列和毫秒定時任務 API

基于 swoole 下 異步隊列 API

swoole 定時執行php,基于 swoole 下 異步隊列和毫秒定時任務 API

基于 swoole下 異步隊列API

最後補充:基于swoole 一個簡單的異步隊列就完成了。可以将此隊列封裝成api隊列接口,将它丢到task裡面去慢慢執行吧~哈哈

補充新增的一次定時器任務,支援毫秒級

3.定時任務和異步接口

說明:

1、在swoole 中 毫秒【如 1000 表示 1 秒,v4.2.10 以下版本最大不得超過 86400000【一天】】

2、背景定時器操作接口:操作方法如下

當操作時間在一天内 則直接執行定時器

3、大于一天時

系統執行一個定時crontab任務 ->

每隔12-24小時 運作一次 将資料庫執行接口時間大于目前時間且不超過一天的資料資料執行該接口

關 于使用者執行撤銷則通過推送資料接口去判斷該資料是否進行推送

4、當該需要給執行接口添加參數時 直接将參數放入接口

接口文檔

ClientRequest.php

請求方法 post

參數

參數

是否必選

備注

限制

新增

token

密鑰【header】xxxx

url

執行接口的位址

type

1、異步執行該接口 2、定時執行該接口

http_type

get、post

timing

定時執行該接口的時間 機關【毫秒】