1,關于鈎子ExHook
鈎子 鈎子 (Hooks) 是一種常見的擴充機制,允許開發者在特定的事件點執行自定義代碼。 EMQX 支援鈎子機制,通過攔截子產品間的函數調用、消息傳遞、事件傳遞您可以靈活地修改或擴充系統功能。 多語言鈎子擴充(ExHook)
https://www.emqx.io/docs/zh/v5/extensions/hooks.html
EMQX 絕大部分功能都是通過鈎子實作的,比如:
- 在消息釋出時對消息進行多步的流式處理(編碼/解碼等)。
- 在消息釋出時根據配置對消息進行緩存。
- 使用鈎子的阻塞機制實作消息的延遲釋出。
EMQX 以一個用戶端在其生命周期内事件為基礎,預置了大量的鈎子:
名稱 | 說明 | 執行時機 |
client.connect | 處理連接配接封包 | 服務端收到用戶端的連接配接封包時 |
client.connack | 下發連接配接應答 | 服務端準備下發連接配接應答封包時 |
client.connected | 成功接入 | 用戶端認證完成并成功接入系統後 |
client.disconnected | 連接配接斷開 | 用戶端連接配接層在準備關閉時 |
client.authenticate | 連接配接認證 | 執行完 client.connect 後 |
client.authorize | 釋出訂閱鑒權 | 執行 釋出/訂閱 操作前 |
client.subscribe | 訂閱主題 | 收到訂閱封包後,執行 client.authorize 鑒權前 |
client.unsubscribe | 取消訂閱 | 收到取消訂閱封包後 |
session.created | 會話建立 | client.connected 執行完成,且建立新的會話後 |
session.subscribed | 會話訂閱主題 | 完成訂閱操作後 |
session.unsubscribed | 會話取消訂閱 | 完成取消訂閱操作後 |
session.resumed | 會話恢複 | client.connected 執行完成,且成功恢複舊的會話資訊後 |
session.discarded | 會話被移除 | 會話由于被移除而終止後 |
session.takenover | 會話被接管 | 會話由于被接管而終止後 |
session.terminated | 會話終止 | 會話由于其他原因被終止後 |
message.publish | 消息釋出 | 服務端在釋出(路由)消息前 |
message.delivered | 消息投遞 | 消息準備投遞到用戶端前 |
message.acked | 消息回執 | 服務端在收到用戶端發回的消息 ACK 後 |
message.dropped | 消息丢棄 | 釋出出的消息被丢棄後 |
參數說明:
名稱 | 入參 | 傳回 |
client.connect | ConnInfo:用戶端連接配接層參數 Props:MQTT v5.0 連接配接封包的 Properties 屬性 | 新的 Props |
client.connack | ConnInfo:用戶端連接配接層參數 Rc:傳回碼 Props:MQTT v5.0 連接配接應答封包的 Properties 屬性 | 新的 Props |
client.connected | ClientInfo:用戶端資訊參數 ConnInfo: 用戶端連接配接層參數 | - |
client.disconnected | ClientInfo:用戶端資訊參數 ConnInfo:用戶端連接配接層參數 ReasonCode:錯誤碼 | - |
client.authenticate | ClientInfo:用戶端資訊參數 AuthNResult:認證結果 | 新的 AuthNResult |
client.authorize | ClientInfo:用戶端資訊參數 Topic:釋出/訂閱的主題 PubSub:釋出或訂閱 AuthZResult:授權結果 | 新的 AuthZResult |
client.subscribe | ClientInfo:用戶端資訊參數 Props:MQTT v5.0 訂閱封包的 Properties 參數 TopicFilters:需訂閱的主題清單 | 新的 TopicFilters |
client.unsubscribe | ClientInfo:用戶端資訊參數 Props:MQTT v5.0 取消訂閱封包的 Properties 參數 TopicFilters:需取消訂閱的主題清單 | 新的 TopicFilters |
session.created | ClientInfo:用戶端資訊參數 SessInfo:會話資訊 | - |
session.subscribed | ClientInfo:用戶端資訊參數 Topic:訂閱的主題 SubOpts:訂閱操作的配置選項 | - |
session.unsubscribed | ClientInfo:用戶端資訊參數 Topic:取消訂閱的主題 SubOpts:取消訂閱操作的配置選項 | - |
session.resumed | ClientInfo:用戶端資訊參數 SessInfo:會話資訊 | - |
session.discarded | ClientInfo:用戶端資訊參數 SessInfo:會話資訊 | - |
2,使用python開發ExHook服務
項目代碼參考,需要4.3 + 以上的版本支援,我這裡使用的是5.1的版本:
https://github.com/emqx/emqx-extension-examples
安裝python的 gRPC 和 gRPC Tools:
bashgit clone https://github.com/emqx/emqx-extension-examples
cd exhook-svr-python
python3 -m pip install grpcio grpcio-tools
編譯 *.proto 并生産服務代碼:
bashpython3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/exhook.proto
運作服務,端口 9000
shellpython3 exhook_server.py
python 的代碼空實作,直接展示資料:
python# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC exhook server."""
from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value
import grpc
import exhook_pb2
import exhook_pb2_grpc
class HookProvider(exhook_pb2_grpc.HookProviderServicer):
def OnProviderLoaded(self, request, context):
print("OnProviderLoaded:", request)
specs = [exhook_pb2.HookSpec(name="client.connect"),
exhook_pb2.HookSpec(name="client.connack"),
exhook_pb2.HookSpec(name="client.connected"),
exhook_pb2.HookSpec(name="client.disconnected"),
exhook_pb2.HookSpec(name="client.authenticate"),
exhook_pb2.HookSpec(name="client.authorize"),
exhook_pb2.HookSpec(name="client.subscribe"),
exhook_pb2.HookSpec(name="client.unsubscribe"),
exhook_pb2.HookSpec(name="session.created"),
exhook_pb2.HookSpec(name="session.subscribed"),
exhook_pb2.HookSpec(name="session.unsubscribed"),
exhook_pb2.HookSpec(name="session.resumed"),
exhook_pb2.HookSpec(name="session.discarded"),
exhook_pb2.HookSpec(name="session.takenover"),
exhook_pb2.HookSpec(name="session.terminated"),
exhook_pb2.HookSpec(name="message.publish"),
exhook_pb2.HookSpec(name="message.delivered"),
exhook_pb2.HookSpec(name="message.acked"),
exhook_pb2.HookSpec(name="message.dropped")
]
return exhook_pb2.LoadedResponse(hooks=specs)
def OnProviderUnloaded(self, request, context):
print("OnProviderUnloaded:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnect(self, request, context):
print("OnClientConnect:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnack(self, request, context):
print("OnClientConnack:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnected(self, request, context):
print("OnClientConnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientDisconnected(self, request, context):
print("OnClientDisconnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientAuthenticate(self, request, context):
print("OnClientAuthenticate:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientAuthorize(self, request, context):
print("OnClientAuthorize:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientSubscribe(self, request, context):
print("OnClientSubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnClientUnsubscribe(self, request, context):
print("OnClientUnsubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnSessionCreated(self, request, context):
print("OnSessionCreated:", request)
return exhook_pb2.EmptySuccess()
def OnSessionSubscribed(self, request, context):
print("OnSessionSubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionUnsubscribed(self, request, context):
print("OnSessionUnsubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionResumed(self, request, context):
print("OnSessionResumed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionDiscarded(self, request, context):
print("OnSessionDiscarded:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTakenover(self, request, context):
print("OnSessionTakenover:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTerminated(self, request, context):
print("OnSessionTerminated:", request)
return exhook_pb2.EmptySuccess()
def OnMessagePublish(self, request, context):
print("OnMessagePublish:", request)
nmsg = request.message
#nmsg.payload = b"hardcode payload by exhook-svr-python :)"
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
return reply
## case2: stop publish the 't/d' messages
#def OnMessagePublish(self, request, context):
# nmsg = request.message
# if nmsg.topic == 't/d':
# nmsg.payload = b""
# nmsg.headers['allow_publish'] = b"false"
#
# reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
# return reply
def OnMessageDelivered(self, request, context):
print("OnMessageDelivered:", request)
return exhook_pb2.EmptySuccess()
def OnMessageDropped(self, request, context):
print("OnMessageDropped:", request)
return exhook_pb2.EmptySuccess()
def OnMessageAcked(self, request, context):
print("OnMessageAcked:", request)
return exhook_pb2.EmptySuccess()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
server.add_insecure_port('[::]:9000')
server.start()
print("Started gRPC server on [::]:9000")
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
3,通過emqx的管理界面配置ExHook的gRPC服務
配置服務和端口位址:
服務可以通路成功,并且可以注冊顯示鈎子的數量是 19 說明代碼運作成功。
19個鈎子服務
當啟動鈎子,裝置上下線的時候可以觸發gRPC服務的調用:
4,建立一個裝置,當裝置上線下線,展示消息日志
當鈎子啟用的觸發的消息:
jsonOnProviderLoaded: broker {
version: "5.1.2"
sysdescr: "EMQX"
uptime: 35319623
datetime: "2023-07-30T10:32:07.659816096+00:00"
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
當裝置連接配接出發的消息:
jsonOnClientConnect: conninfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
proto_name: "MQTT"
proto_ver: "5"
keepalive: 60
}
props {
name: "Session-Expiry-Interval"
value: "0"
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
當裝置鑒權觸發資訊:
jsonOnClientAuthenticate: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
當裝置連接配接成功:
jsonOnClientConnected: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
當消息發送觸發資訊:
bashOnMessageDelivered: clientinfo {
node: "[email protected]"
clientid: "emqx_OTIxNz"
peerhost: "172.18.0.1"
sockport: 8083
protocol: "mqtt"
anonymous: true
}
message {
node: "[email protected]"
id: "000601B1E4DB27A06EE4010006430002"
from: "emqx_OTIxNz"
topic: "testtopic/1"
payload: "{ \"msg\": \"hello\" }"
timestamp: 1690713420670
headers {
key: "peerhost"
value: "172.18.0.1"
}
headers {
key: "protocol"
value: "mqtt"
}
headers {
key: "username"
value: ""
}
}
meta {
node: "[email protected]"
version: "5.1.2"
sysdescr: "EMQX"
cluster_name: "emqxcl"
}
其他還有很多消息,隻是簡單的看了幾個消息。
5,總結和ActorCloud資料進行更新
剩下的事情就是把資料入庫了。找到那個裝置,然後把裝置的線上狀态更新下:
postgresql:
sqlupdate devices set "deviceStatus" = 1 where "deviceName" = '11111111' ;
然後裝置的線上狀态就同步了;
繼續把同步代碼補齊,然後連接配接資料庫,修改資料。
更多文章:
fly-iot飛凡物聯專欄: https://blog.csdn.net/freewebsys/category_12219758.html