官方文档: top-think/think-queue
应用场景:
队列适用与多个用户同时执行一个操作,锁适用与单个用户多次执行同一个操作
- 消息队列,发送邮件、短信
缺点:
一旦需处理数据加入到任务内就不能删除,如果删除可以使用redis
队列配置:
Tp5.1 config/queue.php 配置文件 ,启动reids 服务
/**
* 消息队列配置
* 内置驱动:redis、database、topthink、sync
*/
return [
//sync驱动表示取消消息队列还原为同步执行
//'connector' => 'Sync',
//Redis驱动
'connector' => 'redis',
"expire"=>60,//任务过期时间默认为秒,禁用为null
"default"=>"default",//默认队列名称
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密码
"select"=>5,//Redis数据库索引
"timeout"=>0,//Redis连接超时时间
"persistent"=>false,//是否长连接
//Database驱动
//"connector"=>"Database",//数据库驱动
//"expire"=>60,//任务过期时间,单位为秒,禁用为null
//"default"=>"default",//默认队列名称
//"table"=>"jobs",//存储消息的表明,不带前缀
//"dsn"=>[],
//Topthink驱动 ThinkPHP内部的队列通知服务平台
//"connector"=>"Topthink",
//"token"=>"",
//"project_id"=>"",
//"protocol"=>"https",
//"host"=>"qns.topthink.com",
//"port"=>443,
//"api_version"=>1,
//"max_retries"=>3,
//"default"=>"default"
];
代码示例:
/*1.先在控制器里添加下面两个方法*/
namespace app\index\controller;
use think\Controller;
use think\Queue;
use think\Db;
class Index extends Controller
{
public function queueTest(){
$data = [
'order_no' =>rand(100000,999999),
];
$this->add($data['order_no']);
// 一次执行一个任务
// 类的命名空间\类名
// 类的命名空间\类名@方法
$res =Queue::push('app\queue\controller\Execjob', $data, $queue = null); // 创建发布任务
var_dump($res); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
}
public function add($orderNo){
$data =[
'order_no'=>$orderNo,
'msg'=>$orderNo,
'create_time'=>date('Y-m-d H:i:s'),
];
Db::name('tp5_test')->insert($data);
}
}
/*2.在 app/queue/controller/Execjob 创建文件,要执行的任务内容*/
namespace app\queue\controller;
use think\Controller;
use think\facade\Cache;
use think\queue\Job;
class Execjob extends Controller
{
public function fire(Job $job, $data)
{
//....这里执行具体的任务
if($this->jobDone($data))
{
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
$job->release(3); //$delay为延迟时间
}
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
// $job->delete();
// 也可以重新发布这个任务
// $job->release($delay); //$delay为延迟时间
}
public function failed($data)
{
// ...任务达到最大重试次数后,失败了
}
public function jobDone($data)
{
print("<info>Job is Done status!"."</info> \n");
return Db::name('tp5_test')->where('order_no',$data['order_no'])->update(['status'=>2]);
}
}
监听队列
写完之后再控制器切换到当前目录下执行,任务的消费与删除
// --queue 队列名称,不指定监听所有队列
php think queue:listen --queue send_mail
测试访问页面
http://tp.com/pub/test/index,任务的推送
<?php
namespace app\pub\controller;
use app\queue\controller\Jobqueue;
use think\Controller;
class Test extends Controller
{
public function index(){
$job_queue=new Jobqueue();
$job_queue->index('send_mail',['[email protected]','sa','测试']);
}
}

队列执行完成后,处于等待下次调用
队列流程:创建 ->监听-> 推送 -> 消费 -> 删除
相关文章:tp5.1_队列queue学习
thinkphp-queue 笔记
think-queue使用教程-用户注册场景异步发送邮件
think-queue