天天看点

如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

作者:flyiot

1,关于钩子ExHook

钩子 钩子 (Hooks) 是一种常见的扩展机制,允许开发者在特定的事件点执行自定义代码。 EMQX 支持钩子机制,通过拦截模块间的函数调用、消息传递、事件传递您可以灵活地修改或扩展系统功能。 多语言钩子扩展(ExHook)

https://www.emqx.io/docs/zh/v5/extensions/hooks.html

EMQX 绝大部分功能都是通过钩子实现的,比如:

  • 在消息发布时对消息进行多步的流式处理(编码/解码等)。
  • 在消息发布时根据配置对消息进行缓存。
  • 使用钩子的阻塞机制实现消息的延迟发布。
如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

EMQX 以一个客户端在其生命周期内事件为基础,预置了大量的钩子:

名称 说明 执行时机
client.connect 处理连接报文 服务端收到客户端的连接报文时
client.connack 下发连接应答 服务端准备下发连接应答报文时
client.connected 成功接入 客户端认证完成并成功接入系统后
client.disconnected 连接断开 客户端连接层在准备关闭时
client.authenticate 连接认证 执行完 client.connect 后
client.authorize 发布订阅鉴权 执行 发布/订阅 操作前
client.subscribe 订阅主题 收到订阅报文后,执行 client.authorize 鉴权前
client.unsubscribe 取消订阅 收到取消订阅报文后
session.created 会话创建 client.connected 执行完成,且创建新的会话后
session.subscribed 会话订阅主题 完成订阅操作后
session.unsubscribed 会话取消订阅 完成取消订阅操作后
session.resumed 会话恢复 client.connected 执行完成,且成功恢复旧的会话信息后
session.discarded 会话被移除 会话由于被移除而终止后
session.takenover 会话被接管 会话由于被接管而终止后
session.terminated 会话终止 会话由于其他原因被终止后
message.publish 消息发布 服务端在发布(路由)消息前
message.delivered 消息投递 消息准备投递到客户端前
message.acked 消息回执 服务端在收到客户端发回的消息 ACK 后
message.dropped 消息丢弃 发布出的消息被丢弃后

参数说明:

名称 入参 返回
client.connect

ConnInfo:客户端连接层参数

Props:MQTT v5.0 连接报文的 Properties 属性

新的 Props
client.connack

ConnInfo:客户端连接层参数

Rc:返回码

Props:MQTT v5.0 连接应答报文的 Properties 属性

新的 Props
client.connected

ClientInfo:客户端信息参数

ConnInfo: 客户端连接层参数

-
client.disconnected

ClientInfo:客户端信息参数

ConnInfo:客户端连接层参数

ReasonCode:错误码

-
client.authenticate

ClientInfo:客户端信息参数

AuthNResult:认证结果

新的 AuthNResult
client.authorize

ClientInfo:客户端信息参数

Topic:发布/订阅的主题

PubSub:发布或订阅

AuthZResult:授权结果

新的 AuthZResult
client.subscribe

ClientInfo:客户端信息参数

Props:MQTT v5.0 订阅报文的 Properties 参数

TopicFilters:需订阅的主题列表

新的 TopicFilters
client.unsubscribe

ClientInfo:客户端信息参数

Props:MQTT v5.0 取消订阅报文的 Properties 参数

TopicFilters:需取消订阅的主题列表

新的 TopicFilters
session.created

ClientInfo:客户端信息参数

SessInfo:会话信息

-
session.subscribed

ClientInfo:客户端信息参数

Topic:订阅的主题

SubOpts:订阅操作的配置选项

-
session.unsubscribed

ClientInfo:客户端信息参数

Topic:取消订阅的主题

SubOpts:取消订阅操作的配置选项

-
session.resumed

ClientInfo:客户端信息参数

SessInfo:会话信息

-
session.discarded

ClientInfo:客户端信息参数

SessInfo:会话信息

-

2,使用python开发ExHook服务

项目代码参考,需要4.3 + 以上的版本支持,我这里使用的是5.1的版本:

https://github.com/emqx/emqx-extension-examples

安装python的 gRPC 和 gRPC Tools:

bashgit clone https://github.com/emqx/emqx-extension-examples
cd exhook-svr-python
python3 -m pip install grpcio grpcio-tools
           

编译 *.proto 并生产服务代码:

bashpython3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/exhook.proto
           

运行服务,端口 9000

shellpython3 exhook_server.py
           

python 的代码空实现,直接展示数据:

python# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC exhook server."""

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value

import grpc

import exhook_pb2
import exhook_pb2_grpc

class HookProvider(exhook_pb2_grpc.HookProviderServicer):

    def OnProviderLoaded(self, request, context):
        print("OnProviderLoaded:", request)
        specs = [exhook_pb2.HookSpec(name="client.connect"),
                 exhook_pb2.HookSpec(name="client.connack"),
                 exhook_pb2.HookSpec(name="client.connected"),
                 exhook_pb2.HookSpec(name="client.disconnected"),
                 exhook_pb2.HookSpec(name="client.authenticate"),
                 exhook_pb2.HookSpec(name="client.authorize"),
                 exhook_pb2.HookSpec(name="client.subscribe"),
                 exhook_pb2.HookSpec(name="client.unsubscribe"),

                 exhook_pb2.HookSpec(name="session.created"),
                 exhook_pb2.HookSpec(name="session.subscribed"),
                 exhook_pb2.HookSpec(name="session.unsubscribed"),
                 exhook_pb2.HookSpec(name="session.resumed"),
                 exhook_pb2.HookSpec(name="session.discarded"),
                 exhook_pb2.HookSpec(name="session.takenover"),
                 exhook_pb2.HookSpec(name="session.terminated"),

                 exhook_pb2.HookSpec(name="message.publish"),
                 exhook_pb2.HookSpec(name="message.delivered"),
                 exhook_pb2.HookSpec(name="message.acked"),
                 exhook_pb2.HookSpec(name="message.dropped")
                ]
        return exhook_pb2.LoadedResponse(hooks=specs)

    def OnProviderUnloaded(self, request, context):
        print("OnProviderUnloaded:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnect(self, request, context):
        print("OnClientConnect:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnack(self, request, context):
        print("OnClientConnack:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnected(self, request, context):
        print("OnClientConnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientDisconnected(self, request, context):
        print("OnClientDisconnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientAuthenticate(self, request, context):
        print("OnClientAuthenticate:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientAuthorize(self, request, context):
        print("OnClientAuthorize:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientSubscribe(self, request, context):
        print("OnClientSubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientUnsubscribe(self, request, context):
        print("OnClientUnsubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionCreated(self, request, context):
        print("OnSessionCreated:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionSubscribed(self, request, context):
        print("OnSessionSubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionUnsubscribed(self, request, context):
        print("OnSessionUnsubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionResumed(self, request, context):
        print("OnSessionResumed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionDiscarded(self, request, context):
        print("OnSessionDiscarded:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTakenover(self, request, context):
        print("OnSessionTakenover:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTerminated(self, request, context):
        print("OnSessionTerminated:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessagePublish(self, request, context):
        print("OnMessagePublish:", request)
        nmsg = request.message
        #nmsg.payload = b"hardcode payload by exhook-svr-python :)"

        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
        return reply

    ## case2: stop publish the 't/d' messages
    #def OnMessagePublish(self, request, context):
    #    nmsg = request.message
    #    if nmsg.topic == 't/d':
    #        nmsg.payload = b""
    #        nmsg.headers['allow_publish'] = b"false"
    #
    #    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
    #    return reply

    def OnMessageDelivered(self, request, context):
        print("OnMessageDelivered:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageDropped(self, request, context):
        print("OnMessageDropped:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageAcked(self, request, context):
        print("OnMessageAcked:", request)
        return exhook_pb2.EmptySuccess()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
    server.add_insecure_port('[::]:9000')
    server.start()

    print("Started gRPC server on [::]:9000")

    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

           

3,通过emqx的管理界面配置ExHook的gRPC服务

配置服务和端口地址:

如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

服务可以访问成功,并且可以注册显示钩子的数量是 19 说明代码运行成功。

如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

19个钩子服务

如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

当启动钩子,设备上下线的时候可以触发gRPC服务的调用:

4,创建一个设备,当设备上线下线,展示消息日志

当钩子启用的触发的消息:

jsonOnProviderLoaded: broker {
  version: "5.1.2"
  sysdescr: "EMQX"
  uptime: 35319623
  datetime: "2023-07-30T10:32:07.659816096+00:00"
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

当设备连接出发的消息:

jsonOnClientConnect: conninfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  proto_name: "MQTT"
  proto_ver: "5"
  keepalive: 60
}
props {
  name: "Session-Expiry-Interval"
  value: "0"
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}
           

当设备鉴权触发信息:

jsonOnClientAuthenticate: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

当设备连接成功:

jsonOnClientConnected: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}
           

当消息发送触发信息:

bashOnMessageDelivered: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
message {
  node: "[email protected]"
  id: "000601B1E4DB27A06EE4010006430002"
  from: "emqx_OTIxNz"
  topic: "testtopic/1"
  payload: "{ \"msg\": \"hello\" }"
  timestamp: 1690713420670
  headers {
    key: "peerhost"
    value: "172.18.0.1"
  }
  headers {
    key: "protocol"
    value: "mqtt"
  }
  headers {
    key: "username"
    value: ""
  }
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

其他还有很多消息,只是简单的看了几个消息。

5,总结和ActorCloud数据进行更新

剩下的事情就是把数据入库了。找到那个设备,然后把设备的在线状态更新下:

postgresql:

sqlupdate devices set "deviceStatus" = 1 where "deviceName" =  '11111111' ;
           

然后设备的在线状态就同步了;

如何编写Emqx的ExHook接口,配置gRPC服务,可以接收到相关信息

继续把同步代码补齐,然后连接数据库,修改数据。

更多文章:

fly-iot飞凡物联专栏: https://blog.csdn.net/freewebsys/category_12219758.html

继续阅读