天天看點

laravel中如何使用消息隊列mac安裝kafka安裝kafka的php擴充producerconsumer

queue配置

首先說明一下我之前的項目中如何使用queue的。

我們現在的項目都是用的symfony,老一點的項目用的symfony1.4,新一點的項目用的都是symfony2。symfony用起來整體感覺還是很爽的,尤其symfony2,整體上來講使用了很多java裡面架構的設計思想。但是他不支援queue。在symfony,我們使用queue也經曆了幾個過程。最開始使用張堰同學的httpsqs。這個簡單使用,但是存在單點。畢竟我們的項目還是正式對外服務的,是以我們研究了Apache旗下的開源項目ActiveMQ,研究研究發現還有Apache旗下還有更新的MQ,那就是Apollo。最後我們決定使用的Apollo。

queue在我們的項目中主要的應用場景就是異步處理一些比較耗時的功能,比如同步第三方資料、資料有變動了同步通知到我們的第三方資料使用者等等。我們大緻的思路是這樣的,在各個controller裡面如果需要異步處理的,就把一個json對象encode一下,塞到Apollo裡面。再寫一個work的Command,在這個Command中解析json對象,根據裡面的action和參數決定來調用不同的方法處理。根據業務需要同時在不同的機器上運作Command作為守護程序一直跑着,也算實作異步多任務處理應用的方案。就這麼一直使用着,直到發現了laravel。打算研究一下。如果可能替代一下也不是不可能。呵呵。

由于才開始學習,當然直接上laravel5。routes、controller、view都基本上和symfony差别不到,上手倒是不困難。最後研究一下queue。

1、安裝laravle,使用composer,倒是很簡單。

?

1 2

composer global require

"laravel/installer=~1.1"

vi

~/.bash_profile

把~/.composer/vendor/bin 加入到環境變量中。

?

1

source

~/.bash_profile

就可以直接在指令行中使用laravel了。試一下。

?

1

laravel -V

能夠看到下面的,就代表成功了。

?

1

Laravel Installer version 1.2.1

2、建立項目。

?

1

laravel new guagua

3、配置redis和queue。

4、建立controller,

?

1

php artisan

make

:controller DefaultController

在controller的action中push100個queue的任務。

?

1 2 3

for

(

$i

= 0;

$i

< 100;

$i

++) {

Queue::push(

new

SendEmail(

"ssss"

.

$i

));

}

5、建立queue的Command

?

1

php artisan

make

:

command

SendEmail --queued

修改app/Commands/SendEmail.php,添加一個私有變量。

?

1

protected

$msg

;

同時修改構造函數。

?

1 2 3 4

public

function

__construct(

$msg

)

{

$this

->msg =

$msg

;

}

再修改的handle方法

?

1 2 3 4 5

public

function

handle() {

sleep(4);

echo

$this

->msg.

"\t"

.

date

(

"Y-m-d H:i:s"

).

"\n"

;

$this

->

delete

();

}

6、修改routes

?

1 2 3 4

Route::get(

'/'

, [

'as'

=>

'index'

,

'uses'

=>

'[email protected]'

]);

7、監聽queue

?

1

php artisan queue:listen

為了驗證多任務處理,我們同時開三個視窗運作同樣的指令。

8、用laravel内建的server啟動服務

?

1

php artisan serve --port 8080

打開浏覽器,通路http://localhost:8080/頁面。當然也可以用nginx,apache之類的。但是需要各種配置,還是内建的使用友善。

在控制台就能看到各個queue執行的情況了,如下圖。可以看到100個任務被三個work平分了。

laravel中如何使用消息隊列mac安裝kafka安裝kafka的php擴充producerconsumer

到此,基本達到了我想要的效果。驗證了laravel可以簡單實作queue,并且可以多任務處理。

make command生成的代碼中use App\Commands\Command ,但是運作時提示沒有這個檔案。 解決辦法,修改為 use Illuminate\Console\Command; 不知道為什麼會出現這個低級問題,難道是我mac系統問題,還是我的人品問題。

在controller的action中push隊列的時候,沒有異步執行,還是在action的腳本中執行的。 發現是配置問題,原來不僅僅要修改config中的queue.php,還要修改.evn中相關配置。 雖然問題解決了,但是還是覺得蛋疼,不能了解。還需要在學習學習laravel。

異步隊列使用方法

1.配置

關于隊列的定義,這裡就不作介紹了。我們要使用異步隊列就有兩個關鍵:

(1)存儲隊列的地方

(2)執行任務的服務

打開 config/queue.php ,這是Laravel5關于隊列的配置檔案。首先我們可以通過 default 參數指定預設隊列驅動,預設配置是 sync , 這是同步隊列,我們要做異步隊列首先就要改變這裡。假設我們用 database 作為驅動,隊列任務将會存放在資料庫中,而我們後面會另外啟動一個背景服務來處理隊列任務,這就是異步方式了。

?

1

'default'

=>

'database'

修改完配置後,我們需要建立一個表來存放隊列任務,Laravel5已經在自帶artisan指令中内置了一個指令用來生成資料遷移,隻需要兩條指令即可,當然你得實作配置好資料庫連接配接。

?

1 2

php artisan queue:table

php artisan migrate

這樣就自動在資料庫中建立了 jobs 表。

2.啟動隊列監聽服務

通過下面這條指令啟動隊列監聽服務,它會自動處理 jobs 表中的隊列任務:

?

1

php artisan queue:listen

在linux中,如果想讓它在背景執行,可以這樣:

?

1

nohup

php artisan queue:listen &

3.添加隊列任務

關于隊列任務的添加,手冊裡說的比較詳細,這裡就簡單舉個例子吧。

首先,通過artisan建立一個隊列指令:

?

1

php artisan

make

:

command

SendEmail --queued

這樣會生成 app/Commands/SendEmail.php 這個類檔案,這個類會被辨別為隊列指令,你可以在 handle 方法中寫自己的業務邏輯。

在控制器中,可以簡單通過 Bus::dispatch 分發任務:

?

1

Bus::dispatch(

new

\App\Commands\SendEmail());

你會發現任務不會立即執行,而是被放到 jobs 表中,由隊列監聽服務處理。

更詳細的用法建議參考 command bus 和 queue 相關的手冊章節。

在之前項目進行采集資料時,采用kafka消息隊列,也挺不錯的,接下來我們來搭建kafka消息隊列:

mac安裝kafka

1.安裝最新版的kafka

brew install kafka
           

這将安裝所有的依賴,包括zookeeper

2.啟動zookeeper

brew services start zookeeper //啟動zookeeper
zkServer start //或者這樣啟動
           

可以用 

brew info zookeeper

 指令檢視zookeeper的相關資訊,包括啟動指令

3.啟動kafka

brew services start kafka //啟動kafka
kafka-server-start /usr/local/etc/kafka/server.properties //或者這樣啟動
           

同樣可以用 

brew info kafka

 指令檢視kafka的相關資訊,包括啟動指令

4. 建立一個topic

建立了一個名字為

test

topic

topic

的名字最好是全e文,不要有 

_ .

等特殊符号,可以用以下指令檢視建立的 

topic

/usr/local/bin/kafka-topics --list --zookeeper localhost: //檢視topic
/usr/local/bin/kafka-topics --delete --zookeeper localhost: --topic entere //删除名為entere的topic
           

5. 發送消息 producer

/usr/local/bin/kafka-console-producer --broker-list localhost: --topic test

hello this is a test message
           

6. 消費消息 consumer

/usr/local/bin/kafka-console-consumer --zookeeper localhost: --topic test --from-beginning
           

7. 配置多個broker叢集

到目前為止,我們都是在單個broker上運作的,但是這沒啥好玩的。對于Kafka來說,單個broker其實就是一個大小為1的叢集,是以對于啟動多個broker的執行個體來說,道理也是一樣的,并沒有太多變化。但是為了感覺一下他,就讓我們将我們的叢集擴充道3個節點(仍然全部運作在我們的本地機器上)。 首先我們為每一個broker建一個配置檔案:

cp config/server.properties config/server-.properties 
cp config/server.properties config/server-.properties
           

現在,編輯這些新檔案,并設定以下屬性:

config/server-.properties:
    broker.id=
    port=
    log.dir=/tmp/kafka-logs-

config/server-.properties:
    broker.id=
    port=
    log.dir=/tmp/kafka-logs-
           

其中broker.id屬性是一個不重複的常量,用來表示叢集中每個節點的名字。我們在這裡不得不重寫port和log.dir,這隻是因為我們是在同一台機器上運作這些指令,而我們要防止多個borker使用同一個端口注冊而覆寫彼此的内容。

我們已經有了Zookeeper并且我們的單節點已經啟動,是以我們現在需要啟動這兩個新節點:

/usr/local/bin/kafka-server-start config/server-.properties &
/usr/local/bin/kafka-server-start config/server-.properties &
           

現在建立一個有三個備份因子的新topic:

/usr/local/bin/kafka-topics --create --zookeeper localhost: --replication-factor  --partitions  --topic my-replicated-topic
           

好了,現在我們有一個叢集了,但是我們怎麼知道每個個broker都在做什麼呢?讓我們運作

describe topics

指令來看看:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic


Topic:my-replicated-topic    PartitionCount:    ReplicationFactor: Configs:

Topic: my-replicated-topic  Partition:     Leader:    Replicas: ,, Isr: ,,
           

這是上面輸出的說明。第一行給出了所有分區的總結,此外每一行都是一個分區的資訊。因為我們現在在這個topic上隻有兩個分區,是以就隻有兩行。

"leader" 負責給定分區中所有的讀和寫的任務。分區将随即選取一個節點作為leader。

“replicas” 列出了所有目前分區中的副本節點。不論這些節點是否是leader或者是否處于激活狀态,都會被列出來。

“isr” 是表示“在同步中”的副本節點的清單。是replicas清單的一個子集,包含了目前處于激活狀态的節點,并且leader節點開頭。

注意在我們的例子中,節點1該topic僅有的一個分區中的leader節點。

我們可以在之前我們建立的topic中運作同樣的指令,來看看是什麼情況:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic test


Topic:test    PartitionCount:    ReplicationFactor: Configs:

Topic: test Partition:     Leader:    Replicas:  Isr: 
           

看,和猜測的一樣 -- 在之前的topic下沒有副本節點,且其運作在server 0上,它是我們在建立topic時在叢集中建立的唯一一個server。

讓我們向我們的新topic釋出一些消息:

/usr/local/bin/kafka-console-producer --broker-list localhost: --topic my-replicated-topic

my test message 
my test message 
           

現在讓我們消費這些消息:

/usr/local/bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic

my test message 
my test message 
           

現在讓我們測試一下容錯性。Broker 1是其中的leader,讓我們關了它:

ps | grep server-.properties

 ttys002    : /System/Library/Frameworks/JavaVM.framework/Versions//Home/bin/java...

kill - 
           

Leader節點轉移了,并且1号節點不再存在于“正在同步”的副本集合内:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic

Topic:my-replicated-topic    PartitionCount:    ReplicationFactor: Configs:

Topic: my-replicated-topic  Partition:     Leader:    Replicas: ,, Isr: ,
           

但是這些消息仍然可以用來消費,即便是原本負責寫的leader節點被關掉了:

bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic

my test message 
my test message 
           

安裝kafka的php擴充

brew install homebrew/php/php70-rdkafka
           

我們這裡選取了 php70-rdkafka 這個擴充,安裝後重新開機php-fpm,大功告

producer

php發送消息示例

<?php
    public function handle() 
    {
        $title = "傳喚華為,沒有赢家的阻擊戰";
        $content = "美國政府繼3月份對中興下重手處罰之後,開始瞄準華為。";
                    //$host_list = "172.16.88.12:9092";
        //$host_list = "172.16.88.11:2181/kafka/q-ksg2na7l";
        $broker = "172.16.88.12:9092";
        //$broker = "localhost:9092";
        $kafka = new \RdKafka\Producer();
        $kafka->setLogLevel(LOG_DEBUG);
        $num = $kafka->addBrokers($broker);
        echo "added $num brokers \r\n";
        $topic = $kafka->newTopic("topic_article_publish");
        for($i = ; $i < ; $i++){
            $msg = [
                'header'=>[
                    'type'=>'article_publish',
                    'time'=>time(),
                ],
                'body'=>[
                    'title'=>$i.'--'.$title,
                    'content'=>$i.'__'.$content,
                ],

            ];
            $topic->produce(RD_KAFKA_PARTITION_UA, , json_encode($msg,JSON_UNESCAPED_UNICODE));
            echo "the $i message sended successfully \r\n";
        }


    }
?>
           

consumer

php接收消息示例

<?php
    public function handle()
    {
        $broker = "172.16.88.12:9092";
        //$broker = "localhost:9092";
        $rk = new \RdKafka\Consumer();
        $rk->setLogLevel(LOG_DEBUG);
        $num = $rk->addBrokers($broker);
        $topic = $rk->newTopic("topic_article_publish");
        $topic->consumeStart(, RD_KAFKA_OFFSET_END);
        //RD_KAFKA_OFFSET_BEGINNING,從partition消息隊列的開始進行consume;
        //RD_KAFKA_OFFSET_END,從partition中的将要produce的下一條資訊開始(忽略即目前所有的消息)
        //rd_kafka_offset_tail(5),consume 5 messages from the end,取最新5條
        while (true) {
            $msg = $topic->consume(, );
            if(null === $msg){

            } else {
                if ($msg->err) {
                    echo $msg->errstr(), "\n";
                    sleep();

                } else {
                    //var_dump($msg);
                    echo $msg->payload, "\n";
                    echo $msg->offset,"\n";
                }
            }
        }

    }
?>