天天看點

《cowboy 源代碼分析第一部 (Erlang實作的http伺服器)》

cowboy是基于ranch的http伺服器。特點是功能強打(支援完整的http協定websocket,spdy等),簡潔,輕量級。

項目的連接配接再這裡

https://github.com/extend/cowboy。

首先回顧一下Http 協定

在shell下 curl

http://127.0.0.1:8080

, 同時用tcpdump抓包可以看到一個最簡單的Http的Get請求。

GET / HTTP1.1 r\n
User-Agent: curl17.36.0 r\n
Host: 127.0.0.1:8080 r\n
Accept: text/plain r\n
r\n      

http request由3部分組成:請求行,消息報頭,請求正文。

1) 請求行:就是第一行,由http的方法開始,由空格分割。後面是url路徑,http的版本。

2) 後面的User-Agent,Host,Accept都是消息報頭。

3) Accept用于RESTFul。

4) 請求正文為空。

post請求

post請求用于請求表單,表單的内容就是消息正文。
POST /reg.jsp HTTP/ (CRLF)
Accept:image/gif,image/x-xbit,... (CRLF)
...
HOST:www.guet.edu.cn (CRLF)
Content-Length:22 (CRLF)
Connection:Keep-Alive (CRLF)
Cache-Control:no-cache (CRLF)
(CRLF)         //該CRLF表示消息報頭已經結束,在此之前為消息報頭
user=jeffrey&pwd=1234  //此行以下為送出的資料      

cowboy的入口

%% hello_world_app.erl
    Dispatch = cowboy_router:compile([
     {'_', [
         {"/", toppage_handler, []}
     ]}
 ]),
 {ok, _} = cowboy:start_http(http, 100, [{port, 8080}], [
     {env, [{dispatch, Dispatch}]}
 ]),      

使用者需要提供一個url轉發規則Dispatch。

然後啟動cowboy:start_http

cowboy是如何啟動的?

start_https(Ref, NbAcceptors, TransOpts, ProtoOpts)
     when is_integer(NbAcceptors), NbAcceptors > 0 ->
 ranch:start_listener(Ref, NbAcceptors,
     ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts).      

可以看到cowboy是運作在ranch上,是以需要提供一個回調函數:cowboy_protocol。所有http request的請求都是這個子產品處理。

一旦用戶端的浏覽器和cowboy的伺服器建立連接配接,ranch的acceptor連接配接池會把這個連接配接轉交給一個cowboy_protocol的子程序。

而使用者設定的dispatch是放在ProtoOpts裡。

cowboy_protocol是如何啟動的?

%% cowboy_protocol.erl
start_link(Ref, Socket, Transport, Opts) ->
 Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
 {ok, Pid}.      

ranch啟動子程序處理連接配接的時候,會調用子產品的start_link函數,也就是cowboy_protocol:start_link。

進一步調用cowboy_protocol:init

init(Ref, Socket, Transport, Opts) ->
 Compress = get_value(compress, Opts, false),
 MaxEmptyLines = get_value(max_empty_lines, Opts, 5),
 MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64),
 MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096),
 MaxHeaders = get_value(max_headers, Opts, 100),
 MaxKeepalive = get_value(max_keepalive, Opts, 100),
 MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096),
 Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
 Env = [{listener, Ref}|get_value(env, Opts, [])],
 OnRequest = get_value(onrequest, Opts, undefined),
 OnResponse = get_value(onresponse, Opts, undefined),
 Timeout = get_value(timeout, Opts, 5000),
 ok = ranch:accept_ack(Ref),
 wait_request(<<>>, #state{socket=Socket, transport=Transport,
     middlewares=Middlewares, compress=Compress, env=Env,
     max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive,
     max_request_line_length=MaxRequestLineLength,
     max_header_name_length=MaxHeaderNameLength,
     max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders,
     onrequest=OnRequest, onresponse=OnResponse,
     timeout=Timeout, until=until(Timeout)}, 0).      

這個函數主要是擷取一些限制的參數,比如:Header的最大長度。

Middlewares 是’中間層‘處理子產品。這個子產品的調用時機是解析完Http request。使用者可以指定,預設值是cowboy_router和cowboy_handler。

MaxKeepalive 是保持連接配接的最長時間。

wait_request 一邊接收資料一邊解析。

注意,wait_request參數描述了它的狀态。一個空的Buffer,一個State,和初始化為0的ReqEmpty。

開始解析Http Request

對于http伺服器來說主要工作就是:解析http request,然後根據請求中的url,和使用者設定的轉發規則,調用相應的處理子產品。最後,把處理結果傳回。

接收資料

wait_request(Buffer, State=#state{socket=Socket, transport=Transport,
     until=Until}, ReqEmpty) ->
 case recv(Socket, Transport, Until) of
     {ok, Data} ->
         parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty);
     {error, _} ->
         terminate(State)
 end.      

wait_request 僅僅是接收資料。隻要收到一點資料就調用parse_request進行解析。

并不是收到完整的Http Request Header才開始解析。這樣收到一點資料就解析一點資料效率更好。

另外,注意:本次收到得資料Data是append在Buffer後面的。因為,流式分析可能一次收到的資料不是完整的一行,需要再次進入這個函數,接收資料。

狀态機

整個Http Request Header的解析就是一個狀态機。起點是parse_request,終點就是解析完畢調用使用者的回調。

《cowboy 源代碼分析第一部 (Erlang實作的http伺服器)》

請求行的解析

開始解析http請求包的第一行,也就是請求行。      

狀态機起點:parse_request。

parse_request(Buffer, State=#state{max_request_line_length=MaxLength,
     max_empty_lines=MaxEmpty}, ReqEmpty) ->
 case match_eol(Buffer, 0) of
     nomatch when byte_size(Buffer) > MaxLength ->
         error_terminate(414, State);
     nomatch ->
         wait_request(Buffer, State, ReqEmpty);
     1 when ReqEmpty =:= MaxEmpty ->
         error_terminate(400, State);
     1 ->
         << _:16, Rest/binary >> = Buffer,
         parse_request(Rest, State, ReqEmpty + 1);
     _ ->
         parse_method(Buffer, State, <<>>)
 end.      

Buffer是收到的資料,首先調用match_eol順序周遊找換行$n的位置。

如果是nomatch,有兩種情況:

1) nomatch when byte_size(Buffer) > MaxLength 長度大于最大長度,傳回414.

2) nomatch, 這個時候需要再接收一點資料,再次進入wait_request。

如果傳回1,說明是一個空行。

如果傳回非nomach,非1,則說明目前Buffer裡至少有完整一行的資料。

态機進入parse_method

這裡的method就是http的get, put, delete, post。      
parse_method(<< C, Rest/bits >>, State, SoFar) ->
             case C of
                     $r -> error_terminate(400, State);
                     $s -> parse_uri(Rest, State, SoFar);
                     _ -> parse_method(Rest, State, << SoFar/binary, C >>)
             end.      
method和後面的uri的分鬲符是空格$s。
 parse_method解析的過程是:對參數進行'Head|Tail'方式綁定。然後判斷Head是不是想要的。      

1) 如果不是,則參數用Tail,把Header用最後一個參數儲存起來。parse_method(Rest, State, << SoFar/binary, C >>)

2_ 如果比對上了,則進入下一個階段的解析。parse_uri(Rest, State, SoFar)。

狀态機進入parse_uri

這裡的uri就是請求資源的uri。      
parse_uri(<< $r, _/bits >>, State, _) ->
            error_terminate(400, State);
    parse_uri(<< "# ", Rest/bits >>, State, Method) ->
            parse_version(Rest, State, Method, <<"#">>, <<>>);
    parse_uri(<< "http://", Rest/bits >>, State, Method) ->
            parse_uri_skip_host(Rest, State, Method);
    parse_uri(<< "https://", Rest/bits >>, State, Method) ->
            parse_uri_skip_host(Rest, State, Method);
    parse_uri(Buffer, State, Method) ->
            parse_uri_path(Buffer, State, Method, <<>>).      
如果Http Request Header裡面的uri部分是#,直接進入下一階段的解析:parse_version ;
 如果是以'http://'或'https://'開頭的,要跳過主機名:parse_uri_skip_host ;
 最後的情況就是正常的uri,調用parse_uri_path。
 注意:parse_uri的第3參數Method,是上一個狀态parse_method的傳回結果。      
parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
            case C of
                    $r -> error_terminate(400, State);
                    $s -> parse_version(Rest, State, Method, SoFar, <<>>);
                    $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
                    $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
                    _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
            end.      
parse_uri_path的方式和parse_method的方式一樣:取出頭一個位元組,判斷是否比對上了$s。
 1) 如果沒有比對,把這個位元組儲存在最後一個參數SoFar後面。
 2) 如果比對上了,進入下一階段的解析。同時最後一個參數SoFar就是這個階段的解析戰果。      

狀态機進入parse_version

這裡的version就是http的版本。      
parse_version(<< "HTTP/1.1r\n", Rest/bits >>, S, M, P, Q) ->
             parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []);
     parse_version(<< "HTTP/1.0r\n", Rest/bits >>, S, M, P, Q) ->
             parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []);
     parse_version(_, State, _, _, _) ->
             error_terminate(505, State).      
parse_version的方式是直接進行字元串比對了。因為version要麼是"HTTP/1.1r\n", 要麼是"HTTP/1.0\r\n"。
 比對上了後,進入下一階段的解析,同時把本階段的解析戰果當作參數往後面傳遞。
 至此,已經解析完了一個完整的一行。      

請求頭的解析

前面幾個狀态已經解析完了http的請求行。
下面開始解析http的請求頭。      

狀态機進入parser_header

請求頭的格式是分号分割的kv。      
#+BEGIN_SRC erlang
     parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
        M, P, Q, V, H) ->
    case match_colon(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_header(Buffer, State, M, P, Q, V, H);
        _ ->
            parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
    end.
     #+END_SRC      
首先match_colon試圖比對':',如果沒有,則進入wait_header,繼續接收資料。
 如果比對上了,則進入parse_hd_name。
 注意:M, P, Q, V, H都是已經解析出來的戰果。
 
 看一下wait_header如何接收資料的。      
#+BEGIN_SRC erlang
     wait_header(Buffer, State=#state{socket=Socket, transport=Transport,
        until=Until}, M, P, Q, V, H) ->
    case recv(Socket, Transport, Until) of
        {ok, Data} ->
            parse_header(<< Buffer/binary, Data/binary >>,
                State, M, P, Q, V, H);
        {error, timeout} ->
            error_terminate(408, State);
        {error, _} ->
            terminate(State)
    end.
     #+END_SRC      
嘗試接收資料,一旦接收到一點資料就調用parse_header繼續解析請求頭。
 如之前所說,如果比對上了':' 進入解析parse_hd_name      

狀态機進入parse_hd_name

解析http請求頭裡的一行中':'前面的name。      
#+BEGIN_SRC erlang
     parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
    case C of
        $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar);
        $s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
        $t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
        ?INLINE_LOWERCASE(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar)
    end.
     #+END_SRC      
同樣的,在比對上了':'之後,對進入parse_hd_before_value,同時把SoFar傳入,作為本次解析的成果。
 舉個例子,解析'Accept: text/plain'
 此時的SoFar就是Accept。      
#+BEGIN_SRC erlang
     parse_hd_before_value(Buffer, State=#state{
        max_header_value_length=MaxLength}, M, P, Q, V, H, N) ->
    case match_eol(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_hd_before_value(Buffer, State, M, P, Q, V, H, N);
        _ ->
            parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>)
    end.
     #+END_SRC      
這個時候要比對是有一個CRLF了。如果沒有,就需要等待wait_hd_before_value。如果有了CRLF,就進入解析'text/plain'      

狀态機進入parse_hd_value

解析http 請求頭每一行的value部分。      
#+BEGIN_SRC erlang
     parse_hd_value(<< $r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) ->
    case Rest of
        << $n >> ->
            wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar);
        << $n, C, Rest2/bits >> when C =:= $\s; C =:= $\t ->
            parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name,
                << SoFar/binary, C >>);
        << $n, Rest2/bits >> ->
            parse_header(Rest2, S, M, P, Q, V, [{Name, SoFar}|Headers])
    end;
     #+END_SRC      
比對到了CRLF,就解析完了一行,此時的{Name, SoFar}就是key和value。
 把這個元組放到最後一個參數。然後,再次進入parse_header,繼續解析剩下的頭。      

狀态機再次進入parse_hader

因為請求頭的格式是一樣的,解析方式也是一樣的。 是以這個地方再次遞歸進來。隻是最後一個參數多了一個上一次解析出來的元組。      
#+BEGIN_SRC
     parse_header(<< $r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) ->
    request(Rest, S, M, P, Q, V, lists:reverse(Headers));
     parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
        M, P, Q, V, H) ->
    case match_colon(Buffer, 0) of
        nomatch when byte_size(Buffer) > MaxLength ->
            error_terminate(400, State);
        nomatch ->
            wait_header(Buffer, State, M, P, Q, V, H);
        _ ->
            parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
    end.
     #+END_SRC      
如果,目前Buffer的資料是$r, $\n說明,http的請求頭已經解析完畢。
 進入請求的處理。      

狀态機進入request

#+BEGIN_SRC erlang
     request(Buffer, State=#state{socket=Socket, transport=Transport,
        req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive,
        compress=Compress, onresponse=OnResponse},
        Method, Path, Query, Version, Headers, Host, Port) ->
    case Transport:peername(Socket) of
        {ok, Peer} ->
            Req = cowboy_req:new(Socket, Transport, Peer, Method, Path,
                Query, Version, Headers, Host, Port, Buffer,
                ReqKeepalive < MaxKeepalive, Compress, OnResponse),
            onrequest(Req, State);
        {error, _} -> %% Couldn't read the peer address; connection is gone.
            terminate(State)
    end.
     #+END_SRC      
首先檢查:Socket的狀态,生成一個cowboy_req的對象,然後進入onrequest。
    onrequest可以執行使用者定義的處理函數。如果沒有則執行預設的。執行的函數是execute。      
#+BEGIN_SRC
     execute(Req, State, Env, [Middleware|Tail]) ->
    case Middleware:execute(Req, Env) of
        {ok, Req2, Env2} ->
            execute(Req2, State, Env2, Tail);
        {suspend, Module, Function, Args} ->
            erlang:hibernate(?MODULE, resume,
                [State, Env, Tail, Module, Function, Args]);
        {halt, Req2} ->
            next_request(Req2, State, ok);
        {error, Code, Req2} ->
            error_terminate(Code, Req2, State)
    end.
     #+END_SRC      
可以看到Middleware|Tail,就是在init中初始化的。
     Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
     是以,這個時候會執行cowboy_router:execute。      
#+BEGIN_SRC
     execute(Req, Env) ->
    {_, Dispatch} = lists:keyfind(dispatch, 1, Env),
    [Host, Path] = cowboy_req:get([host, path], Req),
    case match(Dispatch, Host, Path) of
        {ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
            Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req),
            {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]};
        {error, notfound, host} ->
            {error, 400, Req};
        {error, badrequest, path} ->
            {error, 400, Req};
        {error, notfound, path} ->
            {error, 404, Req}
    end.
     #+END_SRC      
cowboy_router:execute中,調用match對使用者設定的轉發規則進行比對(如何比對的後續文章介紹),傳回對應的Handler。
    回到cowboy_protocol:execute,接下來執行cowboy_handler:execute。真正的開始調用使用者的處理函數了。      
#+BEGIN_SRC erlang
     handler_init(Req, State, Handler, HandlerOpts) ->
    Transport = cowboy_req:get(transport, Req),
    try Handler:init({Transport:name(), http}, Req, HandlerOpts) of
        {ok, Req2, HandlerState} ->
            handler_handle(Req2, State, Handler, HandlerState);
        {loop, Req2, HandlerState} ->
            handler_after_callback(Req2, State, Handler, HandlerState);
        {loop, Req2, HandlerState, hibernate} ->
            handler_after_callback(Req2, State#state{hibernate=true},
                Handler, HandlerState);
        {loop, Req2, HandlerState, Timeout} ->
            State2 = handler_loop_timeout(State#state{loop_timeout=Timeout}),
            handler_after_callback(Req2, State2, Handler, HandlerState);
        {loop, Req2, HandlerState, Timeout, hibernate} ->
            State2 = handler_loop_timeout(State#state{
                hibernate=true, loop_timeout=Timeout}),
            handler_after_callback(Req2, State2, Handler, HandlerState);
        {shutdown, Req2, HandlerState} ->
            terminate_request(Req2, State, Handler, HandlerState,
                {normal, shutdown});
        %% @todo {upgrade, transport, Module}
        {upgrade, protocol, Module} ->
            upgrade_protocol(Req, State, Handler, HandlerOpts, Module);
        {upgrade, protocol, Module, Req2, HandlerOpts2} ->
            upgrade_protocol(Req2, State, Handler, HandlerOpts2, Module)
    catch Class:Reason ->
        cowboy_req:maybe_reply(500, Req),
        erlang:Class([
            {reason, Reason},
            {mfa, {Handler, init, 3}},
            {stacktrace, erlang:get_stacktrace()},
            {req, cowboy_req:to_list(Req)},
            {opts, HandlerOpts}
        ])
    end.
     #+END_SRC      
先調用init初始化,即toppage_handler:init,然後根據傳回值進行處理。這裡是直接調用handler_handle。      
#+BEGIN_SRC erlang
     handler_handle(Req, State, Handler, HandlerState) ->
    try Handler:handle(Req, HandlerState) of
        {ok, Req2, HandlerState2} ->
            terminate_request(Req2, State, Handler, HandlerState2,
                {normal, shutdown})
    catch Class:Reason ->
        cowboy_req:maybe_reply(500, Req),
        handler_terminate(Req, Handler, HandlerState, Reason),
        erlang:Class([
            {reason, Reason},
            {mfa, {Handler, handle, 2}},
            {stacktrace, erlang:get_stacktrace()},
            {req, cowboy_req:to_list(Req)},
            {state, HandlerState}
        ])
    end.
     #+END_SRC      
終于開始執行toppage_handler:handle。      
#+BEGIN_SRC erlang
     handle(Req, State) ->
    {ok, Req2} = cowboy_req:reply(200, [
        {<<"content-type">>, <<"text/plain">>}
    ], <<"Hello world!">>, Req),
    {ok, Req2, State}.
     #+END_SRC      

總結

第一部先到這裡結束。主要跟蹤了一條http請求所需要經過的子產品。有些細節有待進一步分析:比如請求的reply,規則是如何快速比對上的。