天天看点

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

文章目录

  • ​​mongos、nanomsg、zeroMQ简述和go-mongos使用实例​​
  • ​​1. mongos、nanomsg简述​​
  • ​​2. zeroMQ、nanomsg和可扩展协议​​
  • ​​PAIR(双向通信)​​
  • ​​REQREP(客户端请求、服务器回复)​​
  • ​​PIPELINE(单向数据流)​​
  • ​​BUS(多对多通信)​​
  • ​​PUBSUB(主题广播)​​
  • ​​SURVEY(向小组提问)​​
  • ​​3. mongos及实例​​

1. mongos、nanomsg简述

来自:https://cloud.tencent.com/developer/article/1096304

nanomsg是一个消息协议SP ("Scalable Protocols"可扩展协议)的c语言实现,而mangos用golang实现了SP (“Scalable Protocols”)。

消息协议不同于通常我们说的​​消息队列​​,是指一个简单的传输会话协议。

​​mangos​​重点也是替代直接手写TCP,实现各种场合的通讯范式。

推荐:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/

那么mangos、nanomsg有何优点么?

主要是:简单、抽象合理、兼容多种语言、轻量级、学习成本低、比自己造的轮子好用很多。

理解的误区:mangos/nanomsg并不是消息队列,也不是RPC框架。

2. zeroMQ、nanomsg和可扩展协议

​​https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/​​

可以简单理解这些网络框架和协议是对TCP、PGM、IPC、ITC等协议的封装,提供新的接口便于分布式环境下的通信,而zeroMQ较差的可扩展性将其局限于某一些协议,为了解决扩展性等一些其它的问题出现了Nanomsg,Nanomsg 通过为传输和消息传递协议提供可插入的接口来解决这个问题。这意味着支持超出标准集 PUB/SUB、REQ/REP 等的新传输(例如 WebSockets)和新消息模式。

也许最有趣的是 nanomsg 与 ZeroMQ 的哲学背离。nanomsg 不是作为一个通用的网络库,而是打算通过实现所谓的“可扩展性协议”来提供用于构建可扩展和高性能分布式系统的“乐高积木”。这些可扩展协议是通信模式,它们是网络堆栈传输层之上的抽象。这些协议彼此完全分离,因此每个协议都可以体现明确定义的分布式算法。正如 nanomsg 的作者 Martin Sustrik 所说,其目的是通过​​IETF​​标准化协议规范。(zeroMQ可以理解成网络库,而NanoMsg更像按照应用场景定义新的可扩展协议,具体选择那一种则需要看应用场景,就分布式场景下NanoMsg的可扩展性可能更适合分布式复杂多变场景)

Nanomsg 目前定义了六种不同的可扩展性协议:PAIR、REQREP、PIPELINE、BUS、PUBSUB 和 SURVEY。

PAIR(双向通信)

PAIR 在两个端点之间实现简单的一对一、双向通信。两个节点可以相互发送消息。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

REQREP(客户端请求、服务器回复)

REQREP 协议定义了一种用于构建无状态服务来处理用户请求的模式。客户端发送请求,服务器接收请求,进行一些处理,然后返回响应。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

PIPELINE(单向数据流)

PIPELINE 提供单向数据流,这对于创建负载平衡的处理管道非常有用。生产者节点提交分布在消费者节点之间的工作。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

BUS(多对多通信)

BUS 允许从每个对等点发送的消息传递到组中的每个其他对等点。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

PUBSUB(主题广播)

PUBSUB 允许发布者向零个或多个订阅者多播消息。订阅者可以连接到多个发布者,可以订阅特定的主题,允许他们只接收与他们相关的消息。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

SURVEY(向小组提问)

最后一个可扩展性协议是 SURVEY。SURVEY 模式与 PUBSUB 的相似之处在于来自一个节点的消息被广播到整个组,但不同的是组中的每个节点都 响应该消息。这开辟了各种各样的应用程序,因为它使您可以快速轻松地一次性查询大量系统的状态。调查受访者必须在调查员配置的时间窗口内做出回应。

mongos、nanomsg、zeroMQ简述和go-mongos使用实例

这里面文章最后也提到了关于c还是c++还开发zeroMQ的说明,了解这些内容感觉也是蛮有意思的。

3. mongos及实例

关于zeroMQ、manomsg的分析我们不再继续,感兴趣的可以在上面的链接中继续查看(https://bravenewgeek.com/a-look-at-nanomsg-and-scalability-protocols/),感觉分析的还是比较全面的。我们项目中使用了mongos用于边缘网关和摄像头的通信以及提供了人脸分析结果接口(使用的PUBSUB的通信方式,边缘网关中部分进程通过onvif协议拉取摄像头的流数据后通过ffmpeg和gstreamer进行处理来检测人脸并分析对应特征数据,之后通过pub的方式提供数据,而另一些人脸抓拍处理上报或者关联业务处理的进程则是通过mongos来sub人脸属性信息的,这种方式很像MQTT的协议处理方式,但是我们不需要专门搭建中间件,仅仅只是用于进程间通信,所以Nanomsg基于场景分类的可扩展协议是非常棒的,非常适合不同的通信场景)。

可以看这里:https://bravenewgeek.com/fast-scalable-networking-in-go-with-mangos/

这里给了PUBSUB和SURVEY的实例,当然,也可以在mongos的GitHub(https://github.com/nanomsg/mangos-v1/tree/v1.4.0/examples)上去查看,目前看似乎国内使用的还不是很多或者说中文资料不是很多,但从场景来看还是有很多场景可以使用的。

创建文件test.go:

// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// pubsub implements a publish/subscribe example.  server is a listening
// pub socket, and clients are dialing sub sockets.
//
// To use:
//
//   $ go build .
//   $ url=tcp://127.0.0.1:40899
//   $ ./pubsub server $url server & server=$! && sleep 1
//   $ ./pubsub client $url client0 & client0=$!
//   $ ./pubsub client $url client1 & client1=$!
//   $ ./pubsub client $url client2 & client2=$!
//   $ sleep 5
//   $ kill $server $client0 $client1 $client2
//
package main

import (
    "fmt"
    "os"
    "time"

    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pub"
    "nanomsg.org/go-mangos/protocol/sub"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)

func die(format string, v ...interface{}) {
    fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
    os.Exit(1)
}

func date() string {
    return time.Now().Format(time.ANSIC)
}

func server(url string) {
    var sock mangos.Socket
    var err error
    if sock, err = pub.NewSocket(); err != nil {
        die("can't get new pub socket: %s", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        die("can't listen on pub socket: %s", err.Error())
    }
    for {
        // Could also use sock.RecvMsg to get header
        d := date()
        fmt.Printf("SERVER: PUBLISHING DATE %s\n", d)
        if err = sock.Send([]byte(d)); err != nil {
            die("Failed publishing: %s", err.Error())
        }
        time.Sleep(time.Second)
    }
}

func client(url string, name string) {
    var sock mangos.Socket
    var err error
    var msg []byte

    if sock, err = sub.NewSocket(); err != nil {
        die("can't get new sub socket: %s", err.Error())
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        die("can't dial on sub socket: %s", err.Error())
    }
    // Empty byte array effectively subscribes to everything
    err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
    if err != nil {
        die("cannot subscribe: %s", err.Error())
    }
    for {
        if msg, err = sock.Recv(); err != nil {
            die("Cannot recv: %s", err.Error())
        }
        fmt.Printf("CLIENT(%s): RECEIVED %s\n", name, string(msg))
    }
}

func main() {
    if len(os.Args) > 2 && os.Args[1] == "server" {
        server(os.Args[2])
        os.Exit(0)
    }
    if len(os.Args) > 3 && os.Args[1] == "client" {
        client(os.Args[2], os.Args[3])
        os.Exit(0)
    }
    fmt.Fprintf(os.Stderr, "Usage: pubsub server|client <URL> <ARG>\n")
    os.Exit(1)
}      

编译文件(通过module模式创建编译时使用如下方式,否则可直接编译运行):

if [ "$1" == "arm" ]
then
  export GOARCH="arm"
  echo $GOARCH
  export GOOS="linux"
  echo $GOOS
fi
go build -mod=mod      

结果(运行方式在代码中已经给了提示,通过设置url变量之后传参运行即可):

继续阅读