簡介
NSQ是Go語言編寫的,開源的分布式消息隊列中間件,其設計的目的是用來大規模地處理每天數以十億計級别的消息。NSQ 具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規模生成環境下應用的産品。
NSQ在國内公司用的很少,在使用當中愈發的覺得驚喜,比如他的簡單易用、部署快捷,再比如之前比較困擾的 延時定時消息,發現nsq 也支援,官方文檔比較全,咨詢問題時回複也非常的耐心和即時,是以我覺得有必要釋出一篇文章來介紹下nsq,惠及大衆。
nsq 有三個必要的組建nsqd、nsqlookupd、nsqadmin 其中nsqd 和 nsqlookup是必須部署的 下面我們一一介紹。
nsqd :
負責接收消息,存儲隊列和将消息發送給用戶端,nsqd 可以多機器部署,當你使用用戶端向一個topic發送消息時,可以配置多個nsqd位址,消息會随機的配置設定到各個nsqd上,nsqd優先把消息存儲到記憶體channel中,當記憶體channel滿了之後,則把消息寫到磁盤檔案中。他監聽了兩個tcp端口,一個用來服務用戶端,一個用來提供http的接口 ,nsqd 啟動時置頂下nsqlookupd位址即可:
nsqd –lookupd-tcp-address=127.0.0.1:4160
也可以指定端口 與資料目錄
nsqd –lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address=”0.0.0.0:4155″ –data-path=/data/nsqdata
其他配置項可詳見官網
nsqlookupd:
主要負責服務發現 負責nsqd的心跳、狀态監測,給用戶端、nsqadmin提供nsqd位址與狀态
nsqadmin:
nsqadmin是一個web管理界面 啟動方式如下:
nsqadmin –lookupd-http-address=127.0.0.1:4161

channel詳情頁示例圖如下 ,empty可以清空目前channel的資訊,delete删除目前channel, pause是暫停消息消費。
圖中也有幾個比較重要的參數 depth目前的積壓量,in-flight代表已經投遞還未消費掉的消息,deferred是未消費的定時(延時)消息數,ready count比較重要,go的用戶端是通過設定max-in-flight 除以用戶端連接配接數得到的,代表一次推給用戶端多少條消息,或者用戶端準備一次性接受多少條消息,謹慎設定其值,因為可能造成伺服器壓力,如果消費能力比較弱,rdy建議設定的低一點比如3
Topic 和 Channel
其實nsqd相當于kafka當中的分區,channel和consumers用戶端的多個連接配接 相當于kafka的消費組,但nsq比kafka使用方式便捷概念上更容易了解
抛開與kafka的對比,nsq的topic 可以設定多個channel,因為有可能有多個業務方需要定值topic的消息,這樣互不影響,
當然一個消息會發送topic下的所有channel,然後會配置設定到不同用戶端的連接配接上,如下圖。
這篇文章主要介紹nsq的使用,源碼就不展開講,如果有興趣的同學多的話 過幾天我會再開一篇專門叙述nsq的源碼與分析。
這裡提下延時消息:
nsq支援延時消息的投遞,比如我想這條消息5分鐘之後才被投遞出去被用戶端消費,較于普通的消息投遞,多了個毫秒數,預設支援最大的毫秒數為3600000毫秒也就是60分鐘,不過這個值可以在nsqd 啟動的時候 用 -max-req-timeout參數修改最大值。
延時消息可用于以下場景,比如一個訂單超過30分鐘未付款,修改其狀态 或者給客戶發短信提醒,比如之前看到的滴滴打車訂單完成後 一定時間内未評價的可以未其設定預設值,再比如使用者的積分過期,等等場景避免了全表掃描,異步處理,kafka不支援延時消息的投遞,目前知道支援的有rabbitmq rocketmq,但是rabbitmq 有坑,有可能會逾時投遞,而rocketmq隻有阿裡雲付費版支援的比較好。
nsq延時消息的實作是用最小堆算法完成,作者繼承實作heap的一系類接口,專門寫了一個pqueque最小堆的優先隊列,在internal/pequeque 目錄可以看到相關實作,pub的時候如果chanMsg.deferred != 0則會調用channel.PutMessageDeferred方法,最終會調用繼承了go heap接口的pqueque.push方法
延時消息的處理 和普通消息一樣都是 nsqd/protocol_v2.go下messagePump 中把消息發送給用戶端 然後在queueScanWorker中分别處理,pop是peekAndShift方法中,拿目前時間 和 deferred[0]對比如果大于 就彈出發送給用戶端 如下代碼:
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
func (c *Channel) processDeferredQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.deferredMutex.Lock()
item, _ := c.deferredPQ.PeekAndShift(t)
c.deferredMutex.Unlock()
if item == nil {
goto exit
}
dirty = true
msg := item.Value.(*Message)
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
goto exit
}
c.put(msg)
}
exit:
return dirty
}
func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
if pq.Len() == 0 {
return nil, 0
}
item := (*pq)[0]
if item.Priority > max {
return nil, item.Priority - max
}
heap.Remove(pq, 0)
return item, 0
}
php和go的用戶端的使用
官網用戶端連結:
Client Librariesphp用戶端之前官網有一個5年前比較老的用戶端,已經沒人維護 甚至無法運作,于是我貢獻了一個php72擴充版本
php-nsq,速度塊了近三倍,正在逐漸完善,支援各種配置與特性,目前已被官網收納,簡單介紹下使用 順便求下star
pub :
$nsqd_addr = array(
"127.0.0.1:4150",
"127.0.0.1:4154"
);
$nsq = new Nsq();
$is_true = $nsq->connect_nsqd($nsqd_addr);
for($i = 0; $i < 20; $i++){
$nsq->publish("test", "nihao");
}
延時pub :
參數 僅僅多一個毫秒參數,so easy!
$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
$deferred->deferredPublish("test", "message daly", 3000); // 第三值預設範圍 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到
}
sub :
抛異常消息可以自動重試,重試時間可以有retry_delay_time設定,多少時間後再次接收被重試的消息
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr
$nsq = new Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => 2, //optional , default 1
"connect_num" => 1, //optional , default 1
"retry_delay_time" => 5000, //optional, default 0 , after 5000 msec, message will be retried
);
$nsq->subscribe($nsq_lookupd, $config, function($msg){
echo $msg->payload;
echo $msg->attempts;
echo $msg->message_id;
echo $msg->timestamp;
});
go client pub
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("nihao"))
if err != nil {
panic(err)
}
}
go client sub
package main
import (
"fmt"
"sync"
"github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//建立多個連接配接
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "struggle", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ();
}