queue配置
首先說明一下我之前的項目中如何使用queue的。
我們現在的項目都是用的symfony,老一點的項目用的symfony1.4,新一點的項目用的都是symfony2。symfony用起來整體感覺還是很爽的,尤其symfony2,整體上來講使用了很多java裡面架構的設計思想。但是他不支援queue。在symfony,我們使用queue也經曆了幾個過程。最開始使用張堰同學的httpsqs。這個簡單使用,但是存在單點。畢竟我們的項目還是正式對外服務的,是以我們研究了Apache旗下的開源項目ActiveMQ,研究研究發現還有Apache旗下還有更新的MQ,那就是Apollo。最後我們決定使用的Apollo。
queue在我們的項目中主要的應用場景就是異步處理一些比較耗時的功能,比如同步第三方資料、資料有變動了同步通知到我們的第三方資料使用者等等。我們大緻的思路是這樣的,在各個controller裡面如果需要異步處理的,就把一個json對象encode一下,塞到Apollo裡面。再寫一個work的Command,在這個Command中解析json對象,根據裡面的action和參數決定來調用不同的方法處理。根據業務需要同時在不同的機器上運作Command作為守護程序一直跑着,也算實作異步多任務處理應用的方案。就這麼一直使用着,直到發現了laravel。打算研究一下。如果可能替代一下也不是不可能。呵呵。
由于才開始學習,當然直接上laravel5。routes、controller、view都基本上和symfony差别不到,上手倒是不困難。最後研究一下queue。
1、安裝laravle,使用composer,倒是很簡單。
?
1 2 | |
把~/.composer/vendor/bin 加入到環境變量中。
?
1 | |
就可以直接在指令行中使用laravel了。試一下。
?
1 | |
能夠看到下面的,就代表成功了。
?
1 | |
2、建立項目。
?
1 | |
3、配置redis和queue。
4、建立controller,
?
1 | |
在controller的action中push100個queue的任務。
?
1 2 3 | |
5、建立queue的Command
?
1 | |
修改app/Commands/SendEmail.php,添加一個私有變量。
?
1 | |
同時修改構造函數。
?
1 2 3 4 | |
再修改的handle方法
?
1 2 3 4 5 | |
6、修改routes
?
1 2 3 4 | |
7、監聽queue
?
1 | |
為了驗證多任務處理,我們同時開三個視窗運作同樣的指令。
8、用laravel内建的server啟動服務
?
1 | |
打開浏覽器,通路http://localhost:8080/頁面。當然也可以用nginx,apache之類的。但是需要各種配置,還是内建的使用友善。
在控制台就能看到各個queue執行的情況了,如下圖。可以看到100個任務被三個work平分了。

到此,基本達到了我想要的效果。驗證了laravel可以簡單實作queue,并且可以多任務處理。
make command生成的代碼中use App\Commands\Command ,但是運作時提示沒有這個檔案。 解決辦法,修改為 use Illuminate\Console\Command; 不知道為什麼會出現這個低級問題,難道是我mac系統問題,還是我的人品問題。
在controller的action中push隊列的時候,沒有異步執行,還是在action的腳本中執行的。 發現是配置問題,原來不僅僅要修改config中的queue.php,還要修改.evn中相關配置。 雖然問題解決了,但是還是覺得蛋疼,不能了解。還需要在學習學習laravel。
異步隊列使用方法
1.配置
關于隊列的定義,這裡就不作介紹了。我們要使用異步隊列就有兩個關鍵:
(1)存儲隊列的地方
(2)執行任務的服務
打開 config/queue.php ,這是Laravel5關于隊列的配置檔案。首先我們可以通過 default 參數指定預設隊列驅動,預設配置是 sync , 這是同步隊列,我們要做異步隊列首先就要改變這裡。假設我們用 database 作為驅動,隊列任務将會存放在資料庫中,而我們後面會另外啟動一個背景服務來處理隊列任務,這就是異步方式了。
?
1 | |
修改完配置後,我們需要建立一個表來存放隊列任務,Laravel5已經在自帶artisan指令中内置了一個指令用來生成資料遷移,隻需要兩條指令即可,當然你得實作配置好資料庫連接配接。
?
1 2 | |
這樣就自動在資料庫中建立了 jobs 表。
2.啟動隊列監聽服務
通過下面這條指令啟動隊列監聽服務,它會自動處理 jobs 表中的隊列任務:
?
1 | |
在linux中,如果想讓它在背景執行,可以這樣:
?
1 | |
3.添加隊列任務
關于隊列任務的添加,手冊裡說的比較詳細,這裡就簡單舉個例子吧。
首先,通過artisan建立一個隊列指令:
?
1 | |
這樣會生成 app/Commands/SendEmail.php 這個類檔案,這個類會被辨別為隊列指令,你可以在 handle 方法中寫自己的業務邏輯。
在控制器中,可以簡單通過 Bus::dispatch 分發任務:
?
1 | |
你會發現任務不會立即執行,而是被放到 jobs 表中,由隊列監聽服務處理。
更詳細的用法建議參考 command bus 和 queue 相關的手冊章節。
在之前項目進行采集資料時,采用kafka消息隊列,也挺不錯的,接下來我們來搭建kafka消息隊列:
mac安裝kafka
1.安裝最新版的kafka
brew install kafka
這将安裝所有的依賴,包括zookeeper
2.啟動zookeeper
brew services start zookeeper //啟動zookeeper
zkServer start //或者這樣啟動
可以用
brew info zookeeper
指令檢視zookeeper的相關資訊,包括啟動指令
3.啟動kafka
brew services start kafka //啟動kafka
kafka-server-start /usr/local/etc/kafka/server.properties //或者這樣啟動
同樣可以用
brew info kafka
指令檢視kafka的相關資訊,包括啟動指令
4. 建立一個topic
建立了一個名字為
test
的
topic
,
topic
的名字最好是全e文,不要有
_ .
等特殊符号,可以用以下指令檢視建立的
topic
/usr/local/bin/kafka-topics --list --zookeeper localhost: //檢視topic
/usr/local/bin/kafka-topics --delete --zookeeper localhost: --topic entere //删除名為entere的topic
5. 發送消息 producer
/usr/local/bin/kafka-console-producer --broker-list localhost: --topic test
hello this is a test message
6. 消費消息 consumer
/usr/local/bin/kafka-console-consumer --zookeeper localhost: --topic test --from-beginning
7. 配置多個broker叢集
到目前為止,我們都是在單個broker上運作的,但是這沒啥好玩的。對于Kafka來說,單個broker其實就是一個大小為1的叢集,是以對于啟動多個broker的執行個體來說,道理也是一樣的,并沒有太多變化。但是為了感覺一下他,就讓我們将我們的叢集擴充道3個節點(仍然全部運作在我們的本地機器上)。 首先我們為每一個broker建一個配置檔案:
cp config/server.properties config/server-.properties
cp config/server.properties config/server-.properties
現在,編輯這些新檔案,并設定以下屬性:
config/server-.properties:
broker.id=
port=
log.dir=/tmp/kafka-logs-
config/server-.properties:
broker.id=
port=
log.dir=/tmp/kafka-logs-
其中broker.id屬性是一個不重複的常量,用來表示叢集中每個節點的名字。我們在這裡不得不重寫port和log.dir,這隻是因為我們是在同一台機器上運作這些指令,而我們要防止多個borker使用同一個端口注冊而覆寫彼此的内容。
我們已經有了Zookeeper并且我們的單節點已經啟動,是以我們現在需要啟動這兩個新節點:
/usr/local/bin/kafka-server-start config/server-.properties &
/usr/local/bin/kafka-server-start config/server-.properties &
現在建立一個有三個備份因子的新topic:
/usr/local/bin/kafka-topics --create --zookeeper localhost: --replication-factor --partitions --topic my-replicated-topic
好了,現在我們有一個叢集了,但是我們怎麼知道每個個broker都在做什麼呢?讓我們運作
describe topics
指令來看看:
/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount: ReplicationFactor: Configs:
Topic: my-replicated-topic Partition: Leader: Replicas: ,, Isr: ,,
這是上面輸出的說明。第一行給出了所有分區的總結,此外每一行都是一個分區的資訊。因為我們現在在這個topic上隻有兩個分區,是以就隻有兩行。
"leader" 負責給定分區中所有的讀和寫的任務。分區将随即選取一個節點作為leader。
“replicas” 列出了所有目前分區中的副本節點。不論這些節點是否是leader或者是否處于激活狀态,都會被列出來。
“isr” 是表示“在同步中”的副本節點的清單。是replicas清單的一個子集,包含了目前處于激活狀态的節點,并且leader節點開頭。
注意在我們的例子中,節點1該topic僅有的一個分區中的leader節點。
我們可以在之前我們建立的topic中運作同樣的指令,來看看是什麼情況:
/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic test
Topic:test PartitionCount: ReplicationFactor: Configs:
Topic: test Partition: Leader: Replicas: Isr:
看,和猜測的一樣 -- 在之前的topic下沒有副本節點,且其運作在server 0上,它是我們在建立topic時在叢集中建立的唯一一個server。
讓我們向我們的新topic釋出一些消息:
/usr/local/bin/kafka-console-producer --broker-list localhost: --topic my-replicated-topic
my test message
my test message
現在讓我們消費這些消息:
/usr/local/bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic
my test message
my test message
現在讓我們測試一下容錯性。Broker 1是其中的leader,讓我們關了它:
ps | grep server-.properties
ttys002 : /System/Library/Frameworks/JavaVM.framework/Versions//Home/bin/java...
kill -
Leader節點轉移了,并且1号節點不再存在于“正在同步”的副本集合内:
/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount: ReplicationFactor: Configs:
Topic: my-replicated-topic Partition: Leader: Replicas: ,, Isr: ,
但是這些消息仍然可以用來消費,即便是原本負責寫的leader節點被關掉了:
bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic
my test message
my test message
安裝kafka的php擴充
brew install homebrew/php/php70-rdkafka
我們這裡選取了 php70-rdkafka 這個擴充,安裝後重新開機php-fpm,大功告
producer
php發送消息示例
<?php
public function handle()
{
$title = "傳喚華為,沒有赢家的阻擊戰";
$content = "美國政府繼3月份對中興下重手處罰之後,開始瞄準華為。";
//$host_list = "172.16.88.12:9092";
//$host_list = "172.16.88.11:2181/kafka/q-ksg2na7l";
$broker = "172.16.88.12:9092";
//$broker = "localhost:9092";
$kafka = new \RdKafka\Producer();
$kafka->setLogLevel(LOG_DEBUG);
$num = $kafka->addBrokers($broker);
echo "added $num brokers \r\n";
$topic = $kafka->newTopic("topic_article_publish");
for($i = ; $i < ; $i++){
$msg = [
'header'=>[
'type'=>'article_publish',
'time'=>time(),
],
'body'=>[
'title'=>$i.'--'.$title,
'content'=>$i.'__'.$content,
],
];
$topic->produce(RD_KAFKA_PARTITION_UA, , json_encode($msg,JSON_UNESCAPED_UNICODE));
echo "the $i message sended successfully \r\n";
}
}
?>
consumer
php接收消息示例
<?php
public function handle()
{
$broker = "172.16.88.12:9092";
//$broker = "localhost:9092";
$rk = new \RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$num = $rk->addBrokers($broker);
$topic = $rk->newTopic("topic_article_publish");
$topic->consumeStart(, RD_KAFKA_OFFSET_END);
//RD_KAFKA_OFFSET_BEGINNING,從partition消息隊列的開始進行consume;
//RD_KAFKA_OFFSET_END,從partition中的将要produce的下一條資訊開始(忽略即目前所有的消息)
//rd_kafka_offset_tail(5),consume 5 messages from the end,取最新5條
while (true) {
$msg = $topic->consume(, );
if(null === $msg){
} else {
if ($msg->err) {
echo $msg->errstr(), "\n";
sleep();
} else {
//var_dump($msg);
echo $msg->payload, "\n";
echo $msg->offset,"\n";
}
}
}
}
?>