天天看點

如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

作者:flyiot

1,關于鈎子ExHook

鈎子 鈎子 (Hooks) 是一種常見的擴充機制,允許開發者在特定的事件點執行自定義代碼。 EMQX 支援鈎子機制,通過攔截子產品間的函數調用、消息傳遞、事件傳遞您可以靈活地修改或擴充系統功能。 多語言鈎子擴充(ExHook)

https://www.emqx.io/docs/zh/v5/extensions/hooks.html

EMQX 絕大部分功能都是通過鈎子實作的,比如:

  • 在消息釋出時對消息進行多步的流式處理(編碼/解碼等)。
  • 在消息釋出時根據配置對消息進行緩存。
  • 使用鈎子的阻塞機制實作消息的延遲釋出。
如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

EMQX 以一個用戶端在其生命周期内事件為基礎,預置了大量的鈎子:

名稱 說明 執行時機
client.connect 處理連接配接封包 服務端收到用戶端的連接配接封包時
client.connack 下發連接配接應答 服務端準備下發連接配接應答封包時
client.connected 成功接入 用戶端認證完成并成功接入系統後
client.disconnected 連接配接斷開 用戶端連接配接層在準備關閉時
client.authenticate 連接配接認證 執行完 client.connect 後
client.authorize 釋出訂閱鑒權 執行 釋出/訂閱 操作前
client.subscribe 訂閱主題 收到訂閱封包後,執行 client.authorize 鑒權前
client.unsubscribe 取消訂閱 收到取消訂閱封包後
session.created 會話建立 client.connected 執行完成,且建立新的會話後
session.subscribed 會話訂閱主題 完成訂閱操作後
session.unsubscribed 會話取消訂閱 完成取消訂閱操作後
session.resumed 會話恢複 client.connected 執行完成,且成功恢複舊的會話資訊後
session.discarded 會話被移除 會話由于被移除而終止後
session.takenover 會話被接管 會話由于被接管而終止後
session.terminated 會話終止 會話由于其他原因被終止後
message.publish 消息釋出 服務端在釋出(路由)消息前
message.delivered 消息投遞 消息準備投遞到用戶端前
message.acked 消息回執 服務端在收到用戶端發回的消息 ACK 後
message.dropped 消息丢棄 釋出出的消息被丢棄後

參數說明:

名稱 入參 傳回
client.connect

ConnInfo:用戶端連接配接層參數

Props:MQTT v5.0 連接配接封包的 Properties 屬性

新的 Props
client.connack

ConnInfo:用戶端連接配接層參數

Rc:傳回碼

Props:MQTT v5.0 連接配接應答封包的 Properties 屬性

新的 Props
client.connected

ClientInfo:用戶端資訊參數

ConnInfo: 用戶端連接配接層參數

-
client.disconnected

ClientInfo:用戶端資訊參數

ConnInfo:用戶端連接配接層參數

ReasonCode:錯誤碼

-
client.authenticate

ClientInfo:用戶端資訊參數

AuthNResult:認證結果

新的 AuthNResult
client.authorize

ClientInfo:用戶端資訊參數

Topic:釋出/訂閱的主題

PubSub:釋出或訂閱

AuthZResult:授權結果

新的 AuthZResult
client.subscribe

ClientInfo:用戶端資訊參數

Props:MQTT v5.0 訂閱封包的 Properties 參數

TopicFilters:需訂閱的主題清單

新的 TopicFilters
client.unsubscribe

ClientInfo:用戶端資訊參數

Props:MQTT v5.0 取消訂閱封包的 Properties 參數

TopicFilters:需取消訂閱的主題清單

新的 TopicFilters
session.created

ClientInfo:用戶端資訊參數

SessInfo:會話資訊

-
session.subscribed

ClientInfo:用戶端資訊參數

Topic:訂閱的主題

SubOpts:訂閱操作的配置選項

-
session.unsubscribed

ClientInfo:用戶端資訊參數

Topic:取消訂閱的主題

SubOpts:取消訂閱操作的配置選項

-
session.resumed

ClientInfo:用戶端資訊參數

SessInfo:會話資訊

-
session.discarded

ClientInfo:用戶端資訊參數

SessInfo:會話資訊

-

2,使用python開發ExHook服務

項目代碼參考,需要4.3 + 以上的版本支援,我這裡使用的是5.1的版本:

https://github.com/emqx/emqx-extension-examples

安裝python的 gRPC 和 gRPC Tools:

bashgit clone https://github.com/emqx/emqx-extension-examples
cd exhook-svr-python
python3 -m pip install grpcio grpcio-tools
           

編譯 *.proto 并生産服務代碼:

bashpython3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/exhook.proto
           

運作服務,端口 9000

shellpython3 exhook_server.py
           

python 的代碼空實作,直接展示資料:

python# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC exhook server."""

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value

import grpc

import exhook_pb2
import exhook_pb2_grpc

class HookProvider(exhook_pb2_grpc.HookProviderServicer):

    def OnProviderLoaded(self, request, context):
        print("OnProviderLoaded:", request)
        specs = [exhook_pb2.HookSpec(name="client.connect"),
                 exhook_pb2.HookSpec(name="client.connack"),
                 exhook_pb2.HookSpec(name="client.connected"),
                 exhook_pb2.HookSpec(name="client.disconnected"),
                 exhook_pb2.HookSpec(name="client.authenticate"),
                 exhook_pb2.HookSpec(name="client.authorize"),
                 exhook_pb2.HookSpec(name="client.subscribe"),
                 exhook_pb2.HookSpec(name="client.unsubscribe"),

                 exhook_pb2.HookSpec(name="session.created"),
                 exhook_pb2.HookSpec(name="session.subscribed"),
                 exhook_pb2.HookSpec(name="session.unsubscribed"),
                 exhook_pb2.HookSpec(name="session.resumed"),
                 exhook_pb2.HookSpec(name="session.discarded"),
                 exhook_pb2.HookSpec(name="session.takenover"),
                 exhook_pb2.HookSpec(name="session.terminated"),

                 exhook_pb2.HookSpec(name="message.publish"),
                 exhook_pb2.HookSpec(name="message.delivered"),
                 exhook_pb2.HookSpec(name="message.acked"),
                 exhook_pb2.HookSpec(name="message.dropped")
                ]
        return exhook_pb2.LoadedResponse(hooks=specs)

    def OnProviderUnloaded(self, request, context):
        print("OnProviderUnloaded:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnect(self, request, context):
        print("OnClientConnect:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnack(self, request, context):
        print("OnClientConnack:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnected(self, request, context):
        print("OnClientConnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientDisconnected(self, request, context):
        print("OnClientDisconnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientAuthenticate(self, request, context):
        print("OnClientAuthenticate:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientAuthorize(self, request, context):
        print("OnClientAuthorize:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientSubscribe(self, request, context):
        print("OnClientSubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientUnsubscribe(self, request, context):
        print("OnClientUnsubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionCreated(self, request, context):
        print("OnSessionCreated:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionSubscribed(self, request, context):
        print("OnSessionSubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionUnsubscribed(self, request, context):
        print("OnSessionUnsubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionResumed(self, request, context):
        print("OnSessionResumed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionDiscarded(self, request, context):
        print("OnSessionDiscarded:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTakenover(self, request, context):
        print("OnSessionTakenover:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTerminated(self, request, context):
        print("OnSessionTerminated:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessagePublish(self, request, context):
        print("OnMessagePublish:", request)
        nmsg = request.message
        #nmsg.payload = b"hardcode payload by exhook-svr-python :)"

        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
        return reply

    ## case2: stop publish the 't/d' messages
    #def OnMessagePublish(self, request, context):
    #    nmsg = request.message
    #    if nmsg.topic == 't/d':
    #        nmsg.payload = b""
    #        nmsg.headers['allow_publish'] = b"false"
    #
    #    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
    #    return reply

    def OnMessageDelivered(self, request, context):
        print("OnMessageDelivered:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageDropped(self, request, context):
        print("OnMessageDropped:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageAcked(self, request, context):
        print("OnMessageAcked:", request)
        return exhook_pb2.EmptySuccess()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
    server.add_insecure_port('[::]:9000')
    server.start()

    print("Started gRPC server on [::]:9000")

    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

           

3,通過emqx的管理界面配置ExHook的gRPC服務

配置服務和端口位址:

如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

服務可以通路成功,并且可以注冊顯示鈎子的數量是 19 說明代碼運作成功。

如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

19個鈎子服務

如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

當啟動鈎子,裝置上下線的時候可以觸發gRPC服務的調用:

4,建立一個裝置,當裝置上線下線,展示消息日志

當鈎子啟用的觸發的消息:

jsonOnProviderLoaded: broker {
  version: "5.1.2"
  sysdescr: "EMQX"
  uptime: 35319623
  datetime: "2023-07-30T10:32:07.659816096+00:00"
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

當裝置連接配接出發的消息:

jsonOnClientConnect: conninfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  proto_name: "MQTT"
  proto_ver: "5"
  keepalive: 60
}
props {
  name: "Session-Expiry-Interval"
  value: "0"
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}
           

當裝置鑒權觸發資訊:

jsonOnClientAuthenticate: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

當裝置連接配接成功:

jsonOnClientConnected: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}
           

當消息發送觸發資訊:

bashOnMessageDelivered: clientinfo {
  node: "[email protected]"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
message {
  node: "[email protected]"
  id: "000601B1E4DB27A06EE4010006430002"
  from: "emqx_OTIxNz"
  topic: "testtopic/1"
  payload: "{ \"msg\": \"hello\" }"
  timestamp: 1690713420670
  headers {
    key: "peerhost"
    value: "172.18.0.1"
  }
  headers {
    key: "protocol"
    value: "mqtt"
  }
  headers {
    key: "username"
    value: ""
  }
}
meta {
  node: "[email protected]"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

           

其他還有很多消息,隻是簡單的看了幾個消息。

5,總結和ActorCloud資料進行更新

剩下的事情就是把資料入庫了。找到那個裝置,然後把裝置的線上狀态更新下:

postgresql:

sqlupdate devices set "deviceStatus" = 1 where "deviceName" =  '11111111' ;
           

然後裝置的線上狀态就同步了;

如何編寫Emqx的ExHook接口,配置gRPC服務,可以接收到相關資訊

繼續把同步代碼補齊,然後連接配接資料庫,修改資料。

更多文章:

fly-iot飛凡物聯專欄: https://blog.csdn.net/freewebsys/category_12219758.html

繼續閱讀