天天看點

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

一、window下安裝消息隊列服務

1、RabbitMQ(下載下傳位址:https://www.rabbitmq.com/download.html)依賴erlang(下載下傳位址:http://www.erlang.org/downloads),是以先安裝erlang,然後再安裝RabbitMQ,直接下一步安裝即可;

2、找到RabbitMQ安裝目錄,如 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\sbin,按住鍵盤Shift鍵點選右鍵,選擇在此打開指令視窗,

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

,然後輸入(激活插件):rabbitmq-plugins enable rabbitmq_management。

最後重新開機伺服器:net stop RabbitMQ && net start RabbitMQ

3、在浏覽器中輸入:http://127.0.0.1:15672,預設賬号密碼為:guest

二、window環境php_amqp擴充安裝

1、安裝前需要檢視php版本,phpinfo()

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

2、安裝擴充

1)下載下傳擴充 http://pecl.php.net/package ,選擇最新版本點選DLL

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

2) 根據前面的php版本下載下傳對應包

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

3)把解壓出的php_amqp.dll檔案複制到php對應的ext目錄下(C:\soft\php-7.3.4\ext)

4)修改php.ini檔案加上: extension=php_amqp

5)修改apache下的httpd.conf檔案(C:\soft\Apache24\conf\httpd.conf),在做好添加:LoadFile  "C:/soft/php-7.3.4/rabbitmq.4.dll"

6)重新開機apache,再次檢視phpinfo(),如圖所示,表示安裝成功。

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

1、安裝擴充類:在項目目錄下打開CMD視窗,輸入composer require php-amqplib/php-amqplib 安裝

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

如果沒有正常安裝,可以先執行composer update後在回來執行composer require php-amqplib/php-amqplib

2、使用

【php消息隊列RabbitMQ】window環境php_amqp擴充安裝+php CodeIgniter(ci)架構+RabbitMQ使用一、window下安裝消息隊列服務二、window環境php_amqp擴充安裝三、php CodeIgniter(ci)架構使用RabbitMQ消息隊列

在ci項目控制器目錄下建立RabbitMQ.php檔案,D:\wwwroot\CodeIgniter\application\controllers\RabbitMQ.php

<?php
defined('BASEPATH') OR exit('No direct script access allowed');
/**
 * Created by PhpStorm.
 * User: JASON
 * Date: 2019/8/2
 * Time: 17:13
 * RabbitMQ消息隊列
 */
class RabbitMQ extends CI_Controller
{
    public $AMQConfig; //配置資訊
    public $exchangeName; //交換機名
    public $queueName; //隊列名
    public $routeName;  //路由key

    public function __construct(){
        parent::__construct();

        //配置資訊
        $this->AMQConfig = array(
            'host' => '127.0.0.1',
            'port' => '5672',
            'login' => 'DSS',
            'password' => '123456',
            'vhost'=>'DSS'
        );
        $this->exchangeName = 'DSS'; //交換機名
        $this->queueName = 'DSS_ORDER'; //隊列名
        $this->routeName = 'DSS_ORDER'; //路由key
    }

    //消息生産者
    public function send(){
        //建立連接配接和channel
        $conn = new AMQPConnection($this->AMQConfig);
        if (!$conn->connect()) {
            die("Cannot connect to the broker!<br/>");
        }
        $channel = new AMQPChannel($conn);

        //建立交換機對象
        $ex = new AMQPExchange($channel);
        $ex->setName($this->exchangeName);
        //發送消息
        //$channel->startTransaction(); //開始事務
        for ($i = 1; $i <= 5; ++$i) {
            //消息内容
            $message = $i . ".Send Mssage Success! " . date("Y-m-d h:i:s");
            echo $i . ".Send Message:" . $ex->publish($message,  $this->routeName) . "<br/>";
        }
        //$channel->commitTransaction(); //送出事務

        $conn->disconnect();
    }

    //消費者
    public function receive(){
        //建立連接配接和channel
        $conn = new AMQPConnection($this->AMQConfig);
        if (!$conn->connect()) {
            die("Cannot connect to the broker!\n");
        }
        $channel = new AMQPChannel($conn);

        //建立交換機
        $ex = new AMQPExchange($channel);
        $ex->setName($this->exchangeName);
        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型(常用的有fanout、direct、topic、headers)
        $ex->setFlags(AMQP_DURABLE); //持久化
        $ex->declareExchange();
        //echo "Exchange Status:".$ex->declare()."\n";

        //建立隊列
        $q = new AMQPQueue($channel);
        $q->setName( $this->queueName);
        $q->setFlags(AMQP_DURABLE); //持久化
        $total = $q->declareQueue();
        echo "Message Total:". $total ."<br/>";

        //綁定交換機與隊列,并指定路由鍵
        $q->bind($this->exchangeName, $this->routeName);

        //阻塞模式接收消息
        echo "Message:<br/>";
        if($total) {
            for ($i=1;$i<=$total;$i++) {
                //消息者回調函數
                //處理生産者發送過來的資料
                //$q->consume('processMessage');
                //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答

                $envelope = $q->get();
                $msg = $envelope->getBody();
                echo $msg . "<br/>"; //處理消息
                $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答,通知消息隊列資料已處理,删除該資料
            }
        }
        $conn->disconnect();
    }
}