天天看點

laravel 使用rabbitmq

1、首先安裝PHP擴充

//查找
apt list |grep php-amqp
//下載下傳
apt-get install php-amqp
           

2 檢查安裝情況

[email protected]:/var/www/html/tci-api# php -m
[PHP Modules]
amqp
bcmath
calendar
Core
ctype
curl
date
           

3、laravel安裝第三方擴充

composer require php-amqplib/php-amqplib
           

4、修改配置檔案

'rrabitmq' => [
        'host' => env('DEMO_MQ_HOST', '127.0.0.1'),
        'port' => env('DEMO_MQ_PORT', 5672),
        'user' => env('DEMO_MQ_USER', 'guest'),
        'pwd' => env('DEMO_MQ_PWD', 'guest'),
        'vhost' => env('DEMO_MQ_VHOST', '/'),
        'debug' => env('DEMO_MQ_Debug', false),
    ],
           

5、編寫

<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Pagination\Paginator;
use App\Models\Test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class ListController extends Controller
{
    public function index(){
        $conf = config('database.rrabitmq');
        define('ExChange', 'ex');//交換機
        define('Queue', 'qr');//隊列名稱//路由鍵
        $messageBody = 'hello word';//要推送的消息内容
        $this->send_mq($conf,$messageBody);
    }
    public function send_mq($conf,$messageBody){
//        define('ExChange', 'ex');//交換機
//        define('Queue', 'qr');//隊列名稱//路由鍵
//        $messageBody = 'hello word';//要推送的消息内容
//        $conf = config('database.rrabitmq');
        $connection=new AMQPStreamConnection($conf['host'],$conf['port'],$conf['user'],$conf['pwd'],$conf['vhost']);
        //在連接配接内建立一個通道
        $channel = $connection->channel();
        //推送成功
        $channel->set_ack_handler(
            function (AMQPMessage $message) {
                $this->writeToLog($messageBody=$message->body,$msg="success");
            }
        );
        //推送失敗
        $channel->set_nack_handler(
            function (AMQPMessage $message) {
                $this->writeToLog($messageBody=$message->body,$msg="fail");
            }
        );
        //開啟确認模式
        $channel->confirm_select();
        //申明交換機 fanout 廣播模式 direct直連模式  topic 比對模式  header設定請求頭
        $channel->exchange_declare(ExChange,'direct',false,true,false);
        //申明消息隊列
        $channel->queue_declare(Queue, false, true, false, false);
        //隊列和交換器綁定
        $channel->queue_bind(Queue,ExChange);
        // 添加監聽器
        $channel->wait_for_pending_acks();
        //推送消息
        $channel->basic_publish(new AMQPMessage($messageBody,array('content_type'=>'text/plain','delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT)),ExChange);
        //等待确認
        $channel->wait_for_pending_acks();
        $channel->close();
        $connection->close();
    }
    private function writeToLog($messageBody,$msg){
        echo "交換機".ExChange."隊列".Queue."消息".$messageBody.'狀态'.$msg.PHP_EOL;
    }
}