我們可以使用emq自帶的Dashboard插件,進行websocket調試,打開谷歌浏覽器輸入網址,其中192.168.83.128是emq所在的IP位址: http://192.168.83.128:18083/#/websocket
使用者名:admin,密碼:public
WebSocket URI:ws(s)://192.168.83.128:8083/mqtt
TCP URI:tcp://192.168.83.128:1883
一、測試實踐
測試環境:使用MQTT 3.1.1版本協定,Wireshark 2.4.1版本,使用wireshark抓包分析(如果是虛拟機,要抓VMnet8虛拟網卡)。
clientid:861694030142473
username:libaineu2004
password:12345678
cleanSession:true
keepalive:60s
1、測試方案1,使用普通的tcp連接配接,TCP URI:tcp://192.168.83.128:1883,wireshark過濾條件tcp.port == 1883。
(1)、MQTT Connect Command
滑鼠點選wireshark的Frame頁面,右鍵菜單,複制,選中樹的所有可見項目
MQ Telemetry Transport Protocol, Connect Command
Header Flags: 0x10 (Connect Command)
Msg Len: 51
Protocol Name Length: 4
Protocol Name: MQTT
Version: 4
Connect Flags: 0xc2
Keep Alive: 60
Client ID Length: 15
Client ID: 861694030142473
User Name Length: 12
User Name: libaineu2004
Password Length: 8
Password: 12345678
右鍵菜單,複制,位元組為HEX + ASCII轉儲
0000 10 33 00 04 4d 51 54 54 04 c2 00 3c 00 0f 38 36 .3..MQTT...<..86
0010 31 36 39 34 30 33 30 31 34 32 34 37 33 00 0c 6c 1694030142473..l
0020 69 62 61 69 6e 65 75 32 30 30 34 00 08 31 32 33 ibaineu2004..123
0030 34 35 36 37 38 45678
(2)、MQTT Connect Ack
MQ Telemetry Transport Protocol, Connect Ack
Header Flags: 0x20 (Connect Ack)
Msg Len: 2
Acknowledge Flags: 0x00
Return Code: Connection Accepted (0)
0000 20 02 00 00 ...
2、測試方案2,使用websocket連接配接,WebSocket URI:ws(s)://192.168.83.128:8083/mqtt,wireshark過濾條件tcp.port == 8083。我們可以觀察到mqtt連接配接的過程,websocket協定,完整的資料被拆分了好幾幀傳輸。每一幀封包,都有WebSocket和Data字段。我們僅僅需要關注Data字段即可。我們發現,把所有Data字段組合拼接起來,就是和測試方案1完全相同的資料。MQTT Connect Command和Ack都完全相同。說明了即使使用了Websocket方式來傳輸,仍然遵循mqtt協定。
從本質上來講,Websocket也是基于 TCP 協定的,同時借用了HTTP的協定來完成一部分握手。
主要解決 HTTP 協定中一個 request 對應一個 response 的尴尬。(http server 不能主動發送消息給 http client) 。通過 HTTP 完成 websocket 的握手過程,接着按照 websocket 協定進行通訊。
websocket 也有他自己的資料幀格式:
http://blog.csdn.net/u010487568/article/details/20569027(1)MQTT Connect Command
0000 10 .
0000 33 3
0000 00 04 ..
0000 4d 51 54 54 MQTT
0000 04 .
0000 c2 .
0000 00 3c .<
0000 00 0f ..
0000 38 36 31 36 39 34 30 33 30 31 34 32 34 37 33 861694030142473
0000 00 0c ..
0000 6c 69 62 61 69 6e 65 75 32 30 30 34 libaineu2004
0000 00 08 ..
0000 31 32 33 34 35 36 37 38 12345678
(2)MQTT Connect Ack
Data (4 bytes)
Data: 20020000
[Length: 4]
二、以下是源碼分析,先來看emq v1.1.3版本的源碼:
1、-module(emqttd_app).
%%--------------------------------------------------------------------
%% Start Servers
start_servers(Sup) ->
Servers = [
{"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}},
%% Start http listener
start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
打開8083端口,開啟http伺服器。Websocket也是基于 TCP 協定的,同時借用了HTTP的協定來完成一部分握手。
2、-module(emqttd_http).
%% MQTT Over WebSocket
handle_request('GET', "/mqtt", Req) ->
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"),
Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} ->
emqttd_ws:handle_request(Req);
{false, _} ->
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
Req:respond({400, [], <<"Bad Request">>});
{_, Proto} ->
lager:error("WebSocket with error Protocol: ~s", [Proto]),
Req:respond({400, [], <<"Bad WebSocket Protocol">>})
end;
websocket的入口函數。重點關注emqttd_ws:handle_request(Req);
3、-module(emqttd_ws).
ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
parser_fun = ParserFun}, ReplyChannel) ->
?WSLOG(debug, Peer, "RECV ~p", [Data]),
case catch ParserFun(iolist_to_binary(Data)) of
{more, NewParser} ->
State#wsocket_state{parser_fun = NewParser};
{ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel);
{error, Error} ->
?WSLOG(error, Peer, "Frame error: ~p", [Error]),
exit({shutdown, Error});
{'EXIT', Reason} ->
?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
?WSLOG(error, Peer, "Error data: ~p", [Data]),
exit({shutdown, parser_error})
end.
gen_server:cast(ClientPid, {received, Packet}), 接收來自用戶端的消息。
4、-module(emqttd_ws_client).
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
noreply(State#wsclient_state{proto_state = ProtoState1});
{error, Error} ->
?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
shutdown(Error, State);
{error, Error, ProtoState1} ->
shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} ->
stop(Reason, State#wsclient_state{proto_state = ProtoState1})
end;
emqttd_protocol:received(Packet, ProtoState)。接收處理消息。
5、-module(emqttd_protocol).
process(Packet = ?CONNECT_PACKET(Var), State0) ->
#mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
password = Password,
clean_sess = CleanSess,
keep_alive = KeepAlive,
client_id = ClientId} = Var,
State1 = State0#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
client_id = ClientId,
clean_sess = CleanSess,
keepalive = KeepAlive,
will_msg = willmsg(Var),
connected_at = os:timestamp()},
trace(recv, Packet, State1),
{ReturnCode1, SessPresent, State3} =
case validate_connect(Var, State1) of
validate_connect(Var, State1),校驗clientid,username和password的有效性。