天天看點

go微服務架構go-micro深度學習(五) stream 調用過程詳解

     上一篇寫了一下rpc調用過程的實作方式

,簡單來說就是服務端把實作了接口的結構體對象進行反射,抽取方法,簽名,儲存,用戶端調用的時候go-micro封請求資料,服務端接收到請求時,找到需要調用調用的對象和對應的方法,利用反射進行調用,傳回資料。 但是沒有說stream的實作方式,感覺單獨寫一篇文章來說這個更好一些。上一篇文章是基礎,了解了上一篇,stream實作原理一點即破。先說一下使用方式,再說原理。

目前go-micro對 rpc 調用的方式大概如下:

普通的rpc調用 是這樣:

1.連接配接伺服器或者從緩存池得到連接配接
2.用戶端 ->發送資料 -> 服務端接收
3.服務端 ->傳回資料 -> 用戶端處理資料
4.關閉連接配接或者把連接配接傳回到緩存池           

目前 rps stream的實作方式 是這樣子:

1. 連接配接伺服器
2. 用戶端多次發送請求-> 服務端接收
3. 服務端多次傳回資料-> 用戶端處理資料
4. 關閉連接配接           

    當資料量比較大的時候我們可以用stream方式分批次傳輸資料。對于用戶端還是服務端沒有限制,我們可以根據自己的需要使用stream方式,使用方式也非常的簡單,在定義接口的時候在參數或者傳回值前面加上stream然後就可以多次進行傳輸了,使用的代碼還是之前寫的例子,代碼都在github上:

    比如我的例子中定義了兩個使用stream的接口,一個隻在傳回值使用stream,另一個是在參數和傳回值前都加上了stream,最終的使用方式沒有差別

rpc Stream(model.SRequest) returns (stream model.SResponse) {}
    rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}           

看一下go-micro為我們生成的代碼rpcapi.micro.go裡,不要被吓到,生成了很多代碼,但是沒啥了解不了的

Server端

// Server API for Say service
type SayHandler interface {
    // .... others    
    Stream(context.Context, *model.SRequest, Say_StreamStream) error
    BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SResponse) error
    Recv() (*model.SRequest, error)
}
// .... others            

Client端

// Client API for Say service
type SayService interface {    
    //... others
    Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
    BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
    SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error
    Send(*model.SRequest) error
    Recv() (*model.SResponse, error)
}           

    你會發現參數前面加了 Stream後,生成的代碼會把你的參數變成一個接口,這個接口主要要的方法是

SendMsg(interface{}) error
    RecvMsg(interface{}) error
    Close() error           

剩下的兩個接口方法是根據你是發送還是接收生成的,如果有發送就會有Send(你的參數),如果有接收會生成Rev() (你的參數, error),但這兩個方法隻是為了讓你使用時友善,裡面調用的還是SendMsg(interface)和RecvMsg(interface)方法,但是他們是怎麼工作的,如何多次發送和接收傳輸的資料,是不是感覺很神奇。

我就以

TsBidirectionalStream

方法為例開始分析,上一篇和再早之前的文章已經說了服務端啟動的時候都做了哪些操作,這裡就不再贅述,

服務端的實作,很簡單,不斷的擷取用戶端發過來的資料,再給用戶端一次一次的傳回一些資料。

/*
 模拟資料
 */
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        for i := int64(0); i < req.Count; i++ {
            if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
                return err
            }
        }
    }
    return nil
}           

啟動服務,服務開始監聽用戶端傳過來的資料.....

用戶端調用服務端方法:

// 調用 
func TsBidirectionalStream(client rpcapi.SayService) {
    rspStream, err := client.BidirectionalStream(context.Background())
    if err != nil {
        panic(err)
    }
    // send
    go func() {
        rspStream.Send(&model.SRequest{Count: 2})
        rspStream.Send(&model.SRequest{Count: 5})
        // close the stream
        if err := rspStream.Close(); err != nil {
            fmt.Println("stream close err:", err)
        }
    }()
     // recv
    idx := 1
    for  {
        rsp, err := rspStream.Recv()

        if err == io.EOF {
            break
        } else if err != nil {
            panic(err)
        }

        fmt.Printf("test stream get idx %d  data  %v\n", idx, rsp)
        idx++
    }
    fmt.Println("Read Value End")
}           

當用戶端在調用rpc的stream方法是要很得到stream

rspStream, err := client.BidirectionalStream(context.Background())
// 
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
    req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
    stream, err := c.c.Stream(ctx, req, opts...)
    if err != nil {
        return nil, err
    }
    return &sayServiceBidirectionalStream{stream}, nil
}           

這個調用

c.c.Stream(ctx, req, opts...)

是關鍵,他的内部實作就是和伺服器進行連接配接,然後傳回一個stream,進行操作。

用戶端:和服務端建立連接配接,傳回Stream,進行接收和發送資料
服務端:接收用戶端連接配接請求,利用反射找到相應的方法,組織Strem,傳給方法,進行資料的發送和接收           

建立連接配接的時候就是一次rpc調用,服務端接受連接配接,然後用戶端發送一次調用,但是傳輸的是空資料,服務端利用反射找到具體的方法,組織stream,調用具體方法,利用這個連接配接,用戶端和服務端進行多次通信。