天天看點

我的mqtt協定和emqttd開源項目個人了解(23) - websocket用戶端連接配接過程分析(Wireshark抓包+源碼分析)

我們可以使用emq自帶的Dashboard插件,進行websocket調試,打開谷歌浏覽器輸入網址,其中192.168.83.128是emq所在的IP位址: http://192.168.83.128:18083/#/websocket

使用者名:admin,密碼:public

WebSocket URI:ws(s)://192.168.83.128:8083/mqtt

TCP URI:tcp://192.168.83.128:1883

一、測試實踐

測試環境:使用MQTT 3.1.1版本協定,Wireshark 2.4.1版本,使用wireshark抓包分析(如果是虛拟機,要抓VMnet8虛拟網卡)。

clientid:861694030142473

username:libaineu2004

password:12345678

cleanSession:true

keepalive:60s

1、測試方案1,使用普通的tcp連接配接,TCP URI:tcp://192.168.83.128:1883,wireshark過濾條件tcp.port == 1883。

(1)、MQTT Connect Command

滑鼠點選wireshark的Frame頁面,右鍵菜單,複制,選中樹的所有可見項目

MQ Telemetry Transport Protocol, Connect Command

   Header Flags: 0x10 (Connect Command)

   Msg Len: 51

   Protocol Name Length: 4

   Protocol Name: MQTT

   Version: 4

   Connect Flags: 0xc2

   Keep Alive: 60

   Client ID Length: 15

   Client ID: 861694030142473

   User Name Length: 12

   User Name: libaineu2004

   Password Length: 8

   Password: 12345678

右鍵菜單,複制,位元組為HEX + ASCII轉儲

0000   10 33 00 04 4d 51 54 54 04 c2 00 3c 00 0f 38 36  .3..MQTT...<..86

0010   31 36 39 34 30 33 30 31 34 32 34 37 33 00 0c 6c  1694030142473..l

0020   69 62 61 69 6e 65 75 32 30 30 34 00 08 31 32 33  ibaineu2004..123

0030   34 35 36 37 38                                   45678

(2)、MQTT Connect Ack

MQ Telemetry Transport Protocol, Connect Ack

   Header Flags: 0x20 (Connect Ack)

   Msg Len: 2

   Acknowledge Flags: 0x00

   Return Code: Connection Accepted (0)

0000   20 02 00 00                                       ...

2、測試方案2,使用websocket連接配接,WebSocket URI:ws(s)://192.168.83.128:8083/mqtt,wireshark過濾條件tcp.port == 8083。我們可以觀察到mqtt連接配接的過程,websocket協定,完整的資料被拆分了好幾幀傳輸。每一幀封包,都有WebSocket和Data字段。我們僅僅需要關注Data字段即可。我們發現,把所有Data字段組合拼接起來,就是和測試方案1完全相同的資料。MQTT Connect Command和Ack都完全相同。說明了即使使用了Websocket方式來傳輸,仍然遵循mqtt協定。

從本質上來講,Websocket也是基于 TCP 協定的,同時借用了HTTP的協定來完成一部分握手。

主要解決 HTTP 協定中一個 request 對應一個 response 的尴尬。(http server 不能主動發送消息給 http client) 。通過 HTTP 完成 websocket 的握手過程,接着按照 websocket 協定進行通訊。

websocket 也有他自己的資料幀格式:

http://blog.csdn.net/u010487568/article/details/20569027

(1)MQTT Connect Command

0000   10                                               .

0000   33                                               3

0000   00 04                                            ..

0000   4d 51 54 54                                      MQTT

0000   04                                               .

0000   c2                                               .

0000   00 3c                                            .<

0000   00 0f                                            ..

0000   38 36 31 36 39 34 30 33 30 31 34 32 34 37 33     861694030142473

0000   00 0c                                            ..

0000   6c 69 62 61 69 6e 65 75 32 30 30 34              libaineu2004

0000   00 08                                            ..

0000   31 32 33 34 35 36 37 38                          12345678

(2)MQTT Connect Ack

Data (4 bytes)

   Data: 20020000

   [Length: 4]

二、以下是源碼分析,先來看emq v1.1.3版本的源碼:

1、-module(emqttd_app).

%%--------------------------------------------------------------------

%% Start Servers

start_servers(Sup) ->

   Servers = [

              {"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}},

%% Start http listener

start_listener({http, ListenOn, Opts}) ->

   mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});

打開8083端口,開啟http伺服器。Websocket也是基于 TCP 協定的,同時借用了HTTP的協定來完成一部分握手。

2、-module(emqttd_http).

%% MQTT Over WebSocket

handle_request('GET', "/mqtt", Req) ->

   lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),

   Upgrade = Req:get_header_value("Upgrade"),

   Proto   = Req:get_header_value("Sec-WebSocket-Protocol"),

   case {is_websocket(Upgrade), Proto} of

       {true, "mqtt" ++ _Vsn} ->

           emqttd_ws:handle_request(Req);

       {false, _} ->

           lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),

           Req:respond({400, [], <<"Bad Request">>});

       {_, Proto} ->

           lager:error("WebSocket with error Protocol: ~s", [Proto]),

           Req:respond({400, [], <<"Bad WebSocket Protocol">>})

   end;

websocket的入口函數。重點關注emqttd_ws:handle_request(Req);

3、-module(emqttd_ws).

ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
                                     parser_fun = ParserFun}, ReplyChannel) ->
    ?WSLOG(debug, Peer, "RECV ~p", [Data]),
    case catch ParserFun(iolist_to_binary(Data)) of
        {more, NewParser} ->
            State#wsocket_state{parser_fun = NewParser};
        {ok, Packet, Rest} ->
            gen_server:cast(ClientPid, {received, Packet}),
            ws_loop(Rest, reset_parser(State), ReplyChannel);
        {error, Error} ->
            ?WSLOG(error, Peer, "Frame error: ~p", [Error]),
            exit({shutdown, Error});
        {'EXIT', Reason} ->
            ?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
            ?WSLOG(error, Peer, "Error data: ~p", [Data]),
            exit({shutdown, parser_error})
    end.      

gen_server:cast(ClientPid, {received, Packet}), 接收來自用戶端的消息。

4、-module(emqttd_ws_client).

handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
    case emqttd_protocol:received(Packet, ProtoState) of
        {ok, ProtoState1} ->
            noreply(State#wsclient_state{proto_state = ProtoState1});
        {error, Error} ->
            ?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
            shutdown(Error, State);
        {error, Error, ProtoState1} ->
            shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
        {stop, Reason, ProtoState1} ->
            stop(Reason, State#wsclient_state{proto_state = ProtoState1})
    end;      

emqttd_protocol:received(Packet, ProtoState)。接收處理消息。

5、-module(emqttd_protocol).

process(Packet = ?CONNECT_PACKET(Var), State0) ->
    #mqtt_packet_connect{proto_ver  = ProtoVer,
                         proto_name = ProtoName,
                         username   = Username,
                         password   = Password,
                         clean_sess = CleanSess,
                         keep_alive = KeepAlive,
                         client_id  = ClientId} = Var,
    State1 = State0#proto_state{proto_ver  = ProtoVer,
                                proto_name = ProtoName,
                                username   = Username,
                                client_id  = ClientId,
                                clean_sess = CleanSess,
                                keepalive  = KeepAlive,
                                will_msg   = willmsg(Var),
                                connected_at = os:timestamp()},
    trace(recv, Packet, State1),
    {ReturnCode1, SessPresent, State3} =
    case validate_connect(Var, State1) of      

validate_connect(Var, State1),校驗clientid,username和password的有效性。

繼續閱讀