剖析nsq消息队列-目录
分布式消息队列nsq,简单易用,去中心化的设计使
nsq
更健壮, nsq
充分利用了 go
语言的 goroutine
和 channel
来实现的消息处理,代码量也不大,读不了多久就没了。后期的文章我会把 nsq
的源码分析给大家看。
主要的分析路线如下
- 分析
的整体框架结构,分析如何做到的无中心化分布式拓扑结构,如何处理的单点故障。nsq
-
是如何保证消息的可靠性,如何保证消息的处理,对于消息的持久化是如何处理和扩展的。nsq
-
是如何做的消息的负载处理,即如何把合理的、不超过客户端消费能力的情况下,把消息分发到不同的客户端。nsq
-
提供的一些辅助组件。nsq
这篇帖子,介绍
nsq
的主体结构,以及他是如何做到去中心化的分布式拓扑结构,如何处理的单点故障。
几个组件是需要先大概说一下
nsqd
消息队列的主体,对消息的接收,处理和把消息分发到客户端。
nsqlookupd
nsq
拓扑结构信息的管理者,有了他才能组成一个简单易用的无中心化的分布式拓扑网络结构。
go-nsq
nsq
官方的go语言客户端,基本上市面上的主流编程语言都有相应的客户端在这里
还有可视化的组件
nsqadmin
和一些工具像
nsq_to_file
、
nsq_stat
、等等,这些在后期的帖子里会介绍
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL3IDM5YDN4czNtMzNygDMwETNxAzM4ATOxAjMtUTO1IDNz8CX4ATOxAjMvwVN5UjM0MzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
使用方式
两种方式一种是直接连接另一种是通过
nsqlookupd
进行连接
直连方式
nsqd
是独立运行的,我们可以直接使用部署几个
nsqd
然后使用客户端直连的方式使用
例子
目前资源有限,我就都在一台机器上模拟了
启动两个
nsqd
./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=./b
正常启动会有类似下面的输出
[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
[nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
[nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
[nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
[nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
简单使用
func main() {
adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}
config := nsq.NewConfig()
topicName := "testTopic1"
c, _ := nsq.NewConsumer(topicName, "ch1", config)
testHandler := &MyTestHandler{consumer: c}
c.AddHandler(testHandler)
if err := c.ConnectToNSQDs(adds); err != nil {
panic(err)
}
stats := c.Stats()
if stats.Connections == 0 {
panic("stats report 0 connections (should be > 0)")
}
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
fmt.Println("server is running....")
<-stop
}
type MyTestHandler struct {
consumer *nsq.Consumer
}
func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
fmt.Println(string(message.Body))
return nil
}
方法
c.ConnectToNSQDs(adds)
,连接多个
nsqd
服务
然后运行多个客户端实现
这时,我们发送一个消息,
curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'
nsqd会根据他的算法,把消息分配到一个客户端
客户端的输入如下
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd
server is running....
hello world 2
但是这种做的话,需要客户端做一些额外的工作,需要频繁的去检查所有
nsqd
的状态,如果发现出现问题需要客户端主动去处理这些问题。
总结
我使用的客户端库是官方库
go-nsq
,使用直接连
nsqd
的方式,
- 如果有
出现问题,现在的处理方式,他会每隔一段时间执行一次重连操作。想去掉这个连接信息就要额外做一些处理了。nsqd
- 如果对
进行横向扩充,只能是自己民额外的写一些代码调用nsqd
或者ConnectToNSQDs
ConnectToNSQD
去中心化连接方式 nsqlookupd
官方推荐使用连接
nsqlookupd
nsqlookupd
用于做服务的注册和发现,这样可以做到去中心化。
图中我们运行着多个
nsqd
和多个
nsqlookupd
的实例,客户端去连接
nsqlookupd
来操作
nsqd
我们要先启动
nsqlookupd
,为了演示方便,我启动两个
nsqlookupd
实例, 三个
nsqd
实例
./nsqlookupd -tcp-address ":8200" -http-address ":8201"
./nsqlookupd -tcp-address ":7200" -http-address ":7201"
为了演示横向扩充,先启动两个,客户端连接后,再启动第三个。
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./b
--lookupd-tcp-address
用于指定
lookup
的连接地址
客户端简单代码
package main
import (
"fmt"
"os"
"os/signal"
"time"
"github.com/nsqio/go-nsq"
)
func main() {
adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
config := nsq.NewConfig()
config.MaxInFlight = 1000
config.MaxBackoffDuration = 5 * time.Second
config.DialTimeout = 10 * time.Second
topicName := "testTopic1"
c, _ := nsq.NewConsumer(topicName, "ch1", config)
testHandler := &MyTestHandler{consumer: c}
c.AddHandler(testHandler)
if err := c.ConnectToNSQLookupds(adds); err != nil {
panic(err)
}
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
fmt.Println("server is running....")
<-stop
}
type MyTestHandler struct {
consumer *nsq.Consumer
}
func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
fmt.Println(string(message.Body))
return nil
}
ConnectToNSQLookupds
就是用于连接
nsqlookupd
的,但是需要注意的是,连接的是
http
端口
7201
8201
,库
go-nsq
是通过请求其中一个
nsqlookupd
的 http 方法
http://127.0.0.1:7201/lookup?topic=testTopic1
来得到所有提供
topic=testTopic1
的
nsqd
列表信息,然后对所有的
nsqd进行
连接,
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd
目前我们已经连接了两个。
我们演示一下橫向扩充,启动第三个
nsqd
./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./c
这里会有一个问题,当我启动了一个亲的
nsqd
但是他的topic是空的,我们需指定这新的
nsqd
处理哪些topic。
我们可以用
nsqadmin
查看所有的
topic
./nsqadmin --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
然后去你的
nsqd
上去建topic
curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'
当然也可以自己写一些自动化的角本
查看客户端的日志输出
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd
已经连上我们的新
nsqd
了
我手动关闭一个
nsqd
客户端的日志输出已经断开了连接
2019/08/30 15:04:20 ERR 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting
2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete
2019/08/30 15:04:20 WRN 1 [testTopic1/ch1] there are 2 connections left alive
并且
nsqd
nsqlookupd
也断开了连接,客户端再次从
nsqlookupd
取所有的
nsqd
的地址时得到的总是可用的地址。
去中心化实现原理
nsqlookupd
用于管理整个网络拓扑结构,nsqd用他实现服务的注册,客户端使用他得到所有的nsqd服务节点信息,然后所有的consumer端连接
实现原理如下,
-
把自己的服务信息广播给一个或者多个nsqd
nsqlookupd
-
连接一个或者多个客户端
,通过nsqlookupd
得到所有的nsqlookupd
的连接信息,进行连接消费,nsqd
- 如果某个
出现问题,down机了,会和nsqd
断开,这样nsqlookupd
从客户端
得到的nsqlookupd
的列表永远是可用的。nsqd
连接的是所有的客户端
,一个出问题了就用其他的连接,所以也不会受影响。nsqd
作者:李鹏
出处:http://www.cnblogs.com/li-peng/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。