1、首先安裝PHP擴充
//查找
apt list |grep php-amqp
//下載下傳
apt-get install php-amqp
2 檢查安裝情況
[email protected]:/var/www/html/tci-api# php -m
[PHP Modules]
amqp
bcmath
calendar
Core
ctype
curl
date
3、laravel安裝第三方擴充
composer require php-amqplib/php-amqplib
4、修改配置檔案
'rrabitmq' => [
'host' => env('DEMO_MQ_HOST', '127.0.0.1'),
'port' => env('DEMO_MQ_PORT', 5672),
'user' => env('DEMO_MQ_USER', 'guest'),
'pwd' => env('DEMO_MQ_PWD', 'guest'),
'vhost' => env('DEMO_MQ_VHOST', '/'),
'debug' => env('DEMO_MQ_Debug', false),
],
5、編寫
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Pagination\Paginator;
use App\Models\Test;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class ListController extends Controller
{
public function index(){
$conf = config('database.rrabitmq');
define('ExChange', 'ex');//交換機
define('Queue', 'qr');//隊列名稱//路由鍵
$messageBody = 'hello word';//要推送的消息内容
$this->send_mq($conf,$messageBody);
}
public function send_mq($conf,$messageBody){
// define('ExChange', 'ex');//交換機
// define('Queue', 'qr');//隊列名稱//路由鍵
// $messageBody = 'hello word';//要推送的消息内容
// $conf = config('database.rrabitmq');
$connection=new AMQPStreamConnection($conf['host'],$conf['port'],$conf['user'],$conf['pwd'],$conf['vhost']);
//在連接配接内建立一個通道
$channel = $connection->channel();
//推送成功
$channel->set_ack_handler(
function (AMQPMessage $message) {
$this->writeToLog($messageBody=$message->body,$msg="success");
}
);
//推送失敗
$channel->set_nack_handler(
function (AMQPMessage $message) {
$this->writeToLog($messageBody=$message->body,$msg="fail");
}
);
//開啟确認模式
$channel->confirm_select();
//申明交換機 fanout 廣播模式 direct直連模式 topic 比對模式 header設定請求頭
$channel->exchange_declare(ExChange,'direct',false,true,false);
//申明消息隊列
$channel->queue_declare(Queue, false, true, false, false);
//隊列和交換器綁定
$channel->queue_bind(Queue,ExChange);
// 添加監聽器
$channel->wait_for_pending_acks();
//推送消息
$channel->basic_publish(new AMQPMessage($messageBody,array('content_type'=>'text/plain','delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT)),ExChange);
//等待确認
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
}
private function writeToLog($messageBody,$msg){
echo "交換機".ExChange."隊列".Queue."消息".$messageBody.'狀态'.$msg.PHP_EOL;
}
}