天天看點

我的mqtt協定和emqttd開源項目個人了解(12) - Hook使用和連接配接Kafka發送消息,使用ekaf庫

一、工作環境準備

erlang kafka用戶端庫使用的是

https://github.com/helpshift/ekaf emq使用的是v2.3.5版本, https://github.com/emqtt/emq-relx kafka的運作環境準備, http://blog.csdn.net/libaineu2004/article/details/79202408

我們以插件的形式來實作,我的插件路徑是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps

copy一份emq_plugin_template,并更名為emq_plugin_kafka_ekaf,注意相關配置檔案和源檔案都要更名。

進入目錄/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf

Makefile檔案,新增紅色文字

BUILD_DEPS = emqttd cuttlefish ekaf

dep_ekaf = git

https://github.com/helpshift/ekaf.git

master

二、工程檔案修改

1、進入目錄/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 檔案

Makefile 增加

DEPS += emq_plugin_kafka_ekaf

2、relx.config 中 release 段落添加這三行:

kafkamocker,

ekaf,

{emq_plugin_kafka_ekaf, load}

3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins設定自啟動插件

emq_recon.

emq_modules.

emq_retainer.

emq_dashboard.

emq_plugin_kafka_ekaf.

三、源碼實作

1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_ekaf.config

[
  {emq_plugin_kafka_ekaf, [
    {kafka, [
      {bootstrap_broker, {"127.0.0.1", 9092} }, %%不能使用"localhost"
      {partition_strategy, strict_round_robin}
    ]}
  ]}
].      

2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_ekaf.erl

%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
 
-module(emq_plugin_kafka_ekaf).
 
-include_lib("emqttd/include/emqttd.hrl").
 
-define(TEST_TOPIC,<<"emq_broker_message">>).
 
-export([load/1, unload/0]).
 
%% Hooks functions
 
-export([on_client_connected/3, on_client_disconnected/3]).
 
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
 
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
 
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
 
%% Called when the plugin application start
load(Env) ->
    ekaf_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
 
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量發送,不建議使用,我發現kafka接收會滞後
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
    ekaf:produce_async(?TEST_TOPIC, {<<"mykey_1">>, list_to_binary(Json)}),
    {ok, Client}.
 
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
 
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量發送,不建議使用,我發現kafka接收會滞後
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
    ekaf:produce_async(?TEST_TOPIC, {<<"mykey_2">>, list_to_binary(Json)}),
    ok.
 
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
    
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.
 
on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).
 
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.
 
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.
 
on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
 
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};
 
on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    
    Id = Message#mqtt_message.id,
    From = Message#mqtt_message.from, %需要登入和不需要登入這裡的傳回值是不一樣的
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    Timestamp = Message#mqtt_message.timestamp,
 
    ClientId = c(From),
    Username = u(From),
 
    Json = mochijson2:encode([
                  {type, <<"publish">>},
                  {client_id, ClientId},
                  {message, [
                     {username, Username},
                     {topic, Topic},
                     {payload, Payload},
                     {qos, i(Qos)},
                     {dup, i(Dup)},
                     {retain, i(Retain)}
                    ]},
                  {cluster_node, node()},
                  {ts, emqttd_time:now_ms()}
                 ]),
 
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量發送,不建議使用,我發現kafka接收會滞後
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
 
    {ok, Message}.
 
on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
 
on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
 
%% Called when the plugin application stop
unload() ->
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
 
%% ===================================================================
%% ekaf_init https://github.com/helpshift/ekaf
%% ===================================================================
ekaf_init(_Env) ->
    %% Start
    application:load(ekaf),
    %% Get parameters
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka),
    PartitionStrategy= proplists:get_value(partition_strategy, Kafka),
    %% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy),
    %% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
    application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker),
    %% Set topic
    application:set_env(ekaf, ekaf_bootstrap_topics, ?TEST_TOPIC),
 
    {ok, _} = application:ensure_all_started(kafkamocker),
    {ok, _} = application:ensure_all_started(gproc),
    {ok, _} = application:ensure_all_started(ekaf),
 
    io:format("Init ekaf with ~p~n", [BootstrapBroker]).
 
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.

      

完整的源碼下載下傳位址:

https://download.csdn.net/download/libaineu2004/10281784

---

參考文章:

http://www.cnblogs.com/wunaozai/p/8249657.html https://github.com/msdevanms/emqttd_plugin_kafka_bridge