天天看点

剖析nsq消息队列(二) 去中心化源码解析

剖析nsq消息队列-目录

在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把

nsq

实现去中心化的源码和其中的业物逻辑展示给大家看一下。

nsqd和nsqlookupd的通信实现

上一篇中在启动

nsqd

时我用了以下命令,我指定了一个参数

--lookupd-tcp-address

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
           

--lookupd-tcp-address

用于指定

nsqlookupd

tcp

监听地址。

nsqd

nsqlookupd

的通信交流简单来说就是下图这样

剖析nsq消息队列(二) 去中心化源码解析

nsqd

启动后连接

nsqlookupd

,连接成功后,要发送一个魔法标识

nsq.MagicV1

,这个标识有啥魔法么,当然不是,他只是用于标明,客户端和服务端双方使用的信息通信版本,不能的版本有不同的处理方式,为了后期做新的消息处理版本方便吧。

nsqlookupd

的代码块

func (p *tcpServer) Handle(clientConn net.Conn) {	
	// ...
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	// ...
	protocolMagic := string(buf)
	// ...
	var prot protocol.Protocol
	switch protocolMagic {
	case "  V1":
		prot = &LookupProtocolV1{ctx: p.ctx}
	default:
		// ...
		return
	}
	err = prot.IOLoop(clientConn)
	//...
}
           

这个时候的

nsqd

已经和

nsqlookupd

建立好了连接,但是这时,仅仅说明他俩连接成功。

nsqlookupd

也并没有把这个连接加到可用的

nsqd

列表里。

建立连接完成后,

nsqd

会发送

IDENTIFY

命令,这个命令里包含了nsq的基本信息

nsqd

的代码

ci := make(map[string]interface{})
		ci["version"] = version.Binary
		ci["tcp_port"] = n.RealTCPAddr().Port
		ci["http_port"] = n.RealHTTPAddr().Port
		ci["hostname"] = hostname
		ci["broadcast_address"] = n.getOpts().BroadcastAddress

		cmd, err := nsq.Identify(ci)
		if err != nil {
			lp.Close()
			return
		}
		resp, err := lp.Command(cmd)
           

包含了

nsqd

提供的

tcp

http

端口,主机名,版本等等,发送给

nsqlookupd

,

nsqlookupd

收到

IDENTIFY

命令后,解析信息然后加到

nsqd

的可用列表里

nsqlookupd

func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	var err error
	if client.peerInfo != nil {
		return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
	}
	var bodyLen int32
	err = binary.Read(reader, binary.BigEndian, &bodyLen)
	// ...
	body := make([]byte, bodyLen)
	_, err = io.ReadFull(reader, body)
	// ...	
	peerInfo := PeerInfo{id: client.RemoteAddr().String()}
	err = json.Unmarshal(body, &peerInfo)
	// ...
	client.peerInfo = &peerInfo
	// 把nsqd的连接加入到可用列表里    
	if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
		p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
	}
	// ...
	return response, nil
}
           

然后每过15秒,会发送一个

PING

心跳命令给

nsqlookupd

,这样保持存活状态,

nsqlookupd

每次收到发过来的

PING

命令后,也会记下这个

nsqd

的最后更新时间,这样做为一个筛选条件,如果长时间没有更新,就认为这个节点有问题,不会把这个节点的信息加入到可用列表。

到此为止,一个

nsqd

就把自己的信息注册到

nsqlookupd

的可用列表了,我们可以启动多个

nsqd

和多个

nsqlookupd

,为

nsqd

指定多个

nsqlookupd

,就如同我上一篇帖子写的那样

--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200
           

nsqd

和所有的

nsqlookupd

建立连接,注册服务信息,并保持心跳,保证可用列表的更新.

nsqlookupd 挂掉的处理方式

上面我们说了

nsqd

如果出现问题,

nsqlookupd

nsqd

可用列表里就会处理掉这个连接信息。如

nsqlookupd

挂了怎么办呢

剖析nsq消息队列(二) 去中心化源码解析

目前的处理方式是这样的,

无论是心跳,还是其他命令,

nsqd

会给所有的

nsqlookup

发送信息,当

nsqd

发现

nsqlookupd

出现问题时,在每次发送命令时,会不断的进行重新连接:

func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
	initialState := lp.state
	if lp.state != stateConnected {
		err := lp.Connect()
		if err != nil {
			return nil, err
		}
		lp.state = stateConnected
		_, err = lp.Write(nsq.MagicV1)
		if err != nil {
			lp.Close()
			return nil, err
		}
		if initialState == stateDisconnected {
			lp.connectCallback(lp)
		}
		if lp.state != stateConnected {
			return nil, fmt.Errorf("lookupPeer connectCallback() failed")
		}
	}
	// ...
}
           

如果连接成功,会再次调用

connectCallback

方法,进行

IDENTIFY

命令的调用等。

客户端和nsqlookupd、nsqd的通信实现

上一篇帖子里介绍了,客户端如何连接

nsqlookupd

来进行通信

adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
	config := nsq.NewConfig()
	config.MaxInFlight = 1000
	config.MaxBackoffDuration = 5 * time.Second
	config.DialTimeout = 10 * time.Second

	topicName := "testTopic1"
	c, _ := nsq.NewConsumer(topicName, "ch1", config)
	testHandler := &MyTestHandler{consumer: c}

	c.AddHandler(testHandler)
	if err := c.ConnectToNSQLookupds(adds); err != nil {
		panic(err)
	}
           

需要注意

adds

里地址的端口,是

nsqlookupd

http

端口

这里我还使用上一篇帖子中的图,给大家详细分析

剖析nsq消息队列(二) 去中心化源码解析

调用方法

c.ConnectToNSQLookupds(adds)

,他的实现是访问

nsqlookupd

的http端口

http://127.0.0.1:7201/lookup?topic=testTopic1

得到提供

consumer

订阅的

topic

所有的

producers

节点信息, url返回的数据信息如下。

{
  "channels": [
    "nsq_to_file",
    "ch1"
  ],
  "producers": [
    {
      "remote_address": "127.0.0.1:58606",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 8000,
      "http_port": 8001,
      "version": "1.1.1-alpha"
    },
    {
      "remote_address": "127.0.0.1:58627",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 7000,
      "http_port": 7001,
      "version": "1.1.1-alpha"
    }
  ]
}
           
剖析nsq消息队列(二) 去中心化源码解析

方法

queryLookupd

就是进行的上图的操作

  • 得到提供订阅的

    topic

    nsqd

    列表
  • 进行连接
func (r *Consumer) queryLookupd() {
	retries := 0
retry:
	endpoint := r.nextLookupdEndpoint()

	// ...	
	err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
	if err != nil {
		// ...
	}
	var nsqdAddrs []string
	for _, producer := range data.Producers {
		broadcastAddress := producer.BroadcastAddress
		port := producer.TCPPort
		joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
		nsqdAddrs = append(nsqdAddrs, joined)
	}
	// 进行连接
	for _, addr := range nsqdAddrs {
		err = r.ConnectToNSQD(addr)
		if err != nil && err != ErrAlreadyConnected {
			r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
			continue
		}
	}
}
           

如何刷新nsqd的可用列表

有新的nsqd加入,是如何处理的呢?

在调用

ConnectToNSQLookupd

时会启动一个协程

go r.lookupdLoop()

lookupdLoop

的定时循环访问

queryLookupd

更新

nsqd

的可用列表

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
	// ...
	var ticker *time.Ticker
	select {
	case <-time.After(jitter):
	case <-r.exitChan:
		goto exit
	}
	// 设置Interval 来循环访问 queryLookupd
	ticker = time.NewTicker(r.config.LookupdPollInterval)
	for {
		select {
		case <-ticker.C:
			r.queryLookupd()
		case <-r.lookupdRecheckChan:
			r.queryLookupd()
		case <-r.exitChan:
			goto exit
		}
	}

exit:
	// ...
}
           

处理 nsqd 的单点故障

剖析nsq消息队列(二) 去中心化源码解析

当有

nsqd

出现故障时怎么办?当前的处理方式是

  • nsqdlookupd

    会把这个故障节点从可用列表中去除,客户端从接口得到的可用列表永远都是可用的。
  • 客户端会把这个故障节点从可用节点上移除,然后要去判断是否使用了

    nsqlookup

    进行了连接,如果是则

    case r.lookupdRecheckChan <- 1

    去刷新可用列表

    queryLookupd

    ,如果不是,然后启动一个协程去定时做重试连接,如果故障恢复,连接成功,会重新加入到可用列表.

    客户端实现的代码

func (r *Consumer) onConnClose(c *Conn) {
	// ...
	// remove this connections RDY count from the consumer's total
	delete(r.connections, c.String())
	left := len(r.connections)
	// ...
	r.mtx.RLock()
	numLookupd := len(r.lookupdHTTPAddrs)
	reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0
	// 如果使用的是nslookup则去刷新可用列表
	if numLookupd > 0 {
		// trigger a poll of the lookupd
		select {
		case r.lookupdRecheckChan <- 1:
		default:
		}
	} else if reconnect {
		// ... 
		}(c.String())
	}
}
           

作者:李鹏

出处:http://www.cnblogs.com/li-peng/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。