1,关于钩子ExHook
钩子 钩子 (Hooks) 是一种常见的扩展机制,允许开发者在特定的事件点执行自定义代码。 EMQX 支持钩子机制,通过拦截模块间的函数调用、消息传递、事件传递您可以灵活地修改或扩展系统功能。 多语言钩子扩展(ExHook)
https://www.emqx.io/docs/zh/v5/extensions/hooks.html
EMQX 绝大部分功能都是通过钩子实现的,比如:
- 在消息发布时对消息进行多步的流式处理(编码/解码等)。
- 在消息发布时根据配置对消息进行缓存。
- 使用钩子的阻塞机制实现消息的延迟发布。
EMQX 以一个客户端在其生命周期内事件为基础,预置了大量的钩子:
名称 | 说明 | 执行时机 |
client.connect | 处理连接报文 | 服务端收到客户端的连接报文时 |
client.connack | 下发连接应答 | 服务端准备下发连接应答报文时 |
client.connected | 成功接入 | 客户端认证完成并成功接入系统后 |
client.disconnected | 连接断开 | 客户端连接层在准备关闭时 |
client.authenticate | 连接认证 | 执行完 client.connect 后 |
client.authorize | 发布订阅鉴权 | 执行 发布/订阅 操作前 |
client.subscribe | 订阅主题 | 收到订阅报文后,执行 client.authorize 鉴权前 |
client.unsubscribe | 取消订阅 | 收到取消订阅报文后 |
session.created | 会话创建 | client.connected 执行完成,且创建新的会话后 |
session.subscribed | 会话订阅主题 | 完成订阅操作后 |
session.unsubscribed | 会话取消订阅 | 完成取消订阅操作后 |
session.resumed | 会话恢复 | client.connected 执行完成,且成功恢复旧的会话信息后 |
session.discarded | 会话被移除 | 会话由于被移除而终止后 |
session.takenover | 会话被接管 | 会话由于被接管而终止后 |
session.terminated | 会话终止 | 会话由于其他原因被终止后 |
message.publish | 消息发布 | 服务端在发布(路由)消息前 |
message.delivered | 消息投递 | 消息准备投递到客户端前 |
message.acked | 消息回执 | 服务端在收到客户端发回的消息 ACK 后 |
message.dropped | 消息丢弃 | 发布出的消息被丢弃后 |
参数说明:
名称 | 入参 | 返回 |
client.connect | ConnInfo:客户端连接层参数 Props:MQTT v5.0 连接报文的 Properties 属性 | 新的 Props |
client.connack | ConnInfo:客户端连接层参数 Rc:返回码 Props:MQTT v5.0 连接应答报文的 Properties 属性 | 新的 Props |
client.connected | ClientInfo:客户端信息参数 ConnInfo: 客户端连接层参数 | - |
client.disconnected | ClientInfo:客户端信息参数 ConnInfo:客户端连接层参数 ReasonCode:错误码 | - |
client.authenticate | ClientInfo:客户端信息参数 AuthNResult:认证结果 | 新的 AuthNResult |
client.authorize | ClientInfo:客户端信息参数 Topic:发布/订阅的主题 PubSub:发布或订阅 AuthZResult:授权结果 | 新的 AuthZResult |
client.subscribe | ClientInfo:客户端信息参数 Props:MQTT v5.0 订阅报文的 Properties 参数 TopicFilters:需订阅的主题列表 | 新的 TopicFilters |
client.unsubscribe | ClientInfo:客户端信息参数 Props:MQTT v5.0 取消订阅报文的 Properties 参数 TopicFilters:需取消订阅的主题列表 | 新的 TopicFilters |
session.created | ClientInfo:客户端信息参数 SessInfo:会话信息 | - |
session.subscribed | ClientInfo:客户端信息参数 Topic:订阅的主题 SubOpts:订阅操作的配置选项 | - |
session.unsubscribed | ClientInfo:客户端信息参数 Topic:取消订阅的主题 SubOpts:取消订阅操作的配置选项 | - |
session.resumed | ClientInfo:客户端信息参数 SessInfo:会话信息 | - |
session.discarded | ClientInfo:客户端信息参数 SessInfo:会话信息 | - |
2,使用python开发ExHook服务
项目代码参考,需要4.3 + 以上的版本支持,我这里使用的是5.1的版本:
https://github.com/emqx/emqx-extension-examples
安装python的 gRPC 和 gRPC Tools:
bashgit clone https://github.com/emqx/emqx-extension-examples
cd exhook-svr-python
python3 -m pip install grpcio grpcio-tools
编译 *.proto 并生产服务代码:
bashpython3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/exhook.proto
运行服务,端口 9000
shellpython3 exhook_server.py
python 的代码空实现,直接展示数据:
python# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC exhook server."""
from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value
import grpc
import exhook_pb2
import exhook_pb2_grpc
class HookProvider(exhook_pb2_grpc.HookProviderServicer):
def OnProviderLoaded(self, request, context):
print("OnProviderLoaded:", request)
specs = [exhook_pb2.HookSpec(name="client.connect"),
exhook_pb2.HookSpec(name="client.connack"),
exhook_pb2.HookSpec(name="client.connected"),
exhook_pb2.HookSpec(name="client.disconnected"),
exhook_pb2.HookSpec(name="client.authenticate"),
exhook_pb2.HookSpec(name="client.authorize"),
exhook_pb2.HookSpec(name="client.subscribe"),
exhook_pb2.HookSpec(name="client.unsubscribe"),
exhook_pb2.HookSpec(name="session.created"),
exhook_pb2.HookSpec(name="session.subscribed"),
exhook_pb2.HookSpec(name="session.unsubscribed"),
exhook_pb2.HookSpec(name="session.resumed"),
exhook_pb2.HookSpec(name="session.discarded"),
exhook_pb2.HookSpec(name="session.takenover"),
exhook_pb2.HookSpec(name="session.terminated"),
exhook_pb2.HookSpec(name="message.publish"),
exhook_pb2.HookSpec(name="message.delivered"),
exhook_pb2.HookSpec(name="message.acked"),
exhook_pb2.HookSpec(name="message.dropped")
]
return exhook_pb2.LoadedResponse(hooks=specs)
def OnProviderUnloaded(self, request, context):
print("OnProviderUnloaded:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnect(self, request, context):
print("OnClientConnect:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnack(self, request, context):
print("OnClientConnack:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnected(self, request, context):
print("OnClientConnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientDisconnected(self, request, context):
print("OnClientDisconnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientAuthenticate(self, request, context):
print("OnClientAuthenticate:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientAuthorize(self, request, context):
print("OnClientAuthorize:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientSubscribe(self, request, context):
print("OnClientSubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnClientUnsubscribe(self, request, context):
print("OnClientUnsubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnSessionCreated(self, request, context):
print("OnSessionCreated:", request)
return exhook_pb2.EmptySuccess()
def OnSessionSubscribed(self, request, context):
print("OnSessionSubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionUnsubscribed(self, request, context):
print("OnSessionUnsubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionResumed(self, request, context):
print("OnSessionResumed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionDiscarded(self, request, context):
print("OnSessionDiscarded:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTakenover(self, request, context):
print("OnSessionTakenover:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTerminated(self, request, context):
print("OnSessionTerminated:", request)
return exhook_pb2.EmptySuccess()
def OnMessagePublish(self, request, context):
print("OnMessagePublish:", request)
nmsg = request.message
#nmsg.payload = b"hardcode payload by exhook-svr-python :)"
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
return reply
## case2: stop publish the 't/d' messages
#def OnMessagePublish(self, request, context):
# nmsg = request.message
# if nmsg.topic == 't/d':
# nmsg.payload = b""
# nmsg.headers['allow_publish'] = b"false"
#
# reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
# return reply
def OnMessageDelivered(self, request, context):
print("OnMessageDelivered:", request)
return exhook_pb2.EmptySuccess()
def OnMessageDropped(self, request, context):
print("OnMessageDropped:", request)
return exhook_pb2.EmptySuccess()
def OnMessageAcked(self, request, context):
print("OnMessageAcked:", request)
return exhook_pb2.EmptySuccess()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
server.add_insecure_port('[::]:9000')
server.start()
print("Started gRPC server on [::]:9000")
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
3,通过emqx的管理界面配置ExHook的gRPC服务
配置服务和端口地址:
服务可以访问成功,并且可以注册显示钩子的数量是 19 说明代码运行成功。
19个钩子服务
当启动钩子,设备上下线的时候可以触发gRPC服务的调用:
4,创建一个设备,当设备上线下线,展示消息日志
当钩子启用的触发的消息:
jsonOnProviderLoaded: broker {
version: "5.1.2"
sysdescr: "EMQX"
uptime: 35319623
datetime: "2023-07-30T10:32:07.659816096+00:00"
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
当设备连接出发的消息:
jsonOnClientConnect: conninfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
proto_name: "MQTT"
proto_ver: "5"
keepalive: 60
}
props {
name: "Session-Expiry-Interval"
value: "0"
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
当设备鉴权触发信息:
jsonOnClientAuthenticate: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
当设备连接成功:
jsonOnClientConnected: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
当消息发送触发信息:
bashOnMessageDelivered: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
message {
node: "[email protected]"
id: "000601B1E4DB27A06EE4010006430002"
from: "emqx_OTIxNz"
topic: "testtopic/1"
payload: "{ \"msg\": \"hello\" }"
timestamp: 1690713420670
headers {
key: "peerhost"
value: "172.18.0.1"
}
headers {
key: "protocol"
value: "mqtt"
}
headers {
key: "username"
value: ""
}
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
其他还有很多消息,只是简单的看了几个消息。
5,总结和ActorCloud数据进行更新
剩下的事情就是把数据入库了。找到那个设备,然后把设备的在线状态更新下:
postgresql:
sqlupdate devices set "deviceStatus" = 1 where "deviceName" = '11111111' ;
然后设备的在线状态就同步了;
继续把同步代码补齐,然后连接数据库,修改数据。
更多文章:
fly-iot飞凡物联专栏: https://blog.csdn.net/freewebsys/category_12219758.html