天天看點

Golang 編寫 Tcp 伺服器Echo 伺服器拆包與粘包優雅關閉

Golang 作為廣泛用于服務端和雲計算領域的程式設計語言,tcp socket 是其中至關重要的功能。無論是 WEB 伺服器還是各類中間件都離不開 tcp socket 的支援。

  • Echo 伺服器
  • 拆包與粘包
  • 優雅關閉

與早期的每個線程持有一個 socket 的 block IO 模型不同, 多路IO複用模型使用單個線程監聽多個 socket, 當某個 socket 準備好資料後再進行響應。在邏輯上與使用 select 語句監聽多個 channel 的模式相同。

目前主要的多路IO複用實作主要包括: SELECT, POLL 和 EPOLL。 為了提高開發效率社群也出現很多封裝庫, 如Netty(Java), Tornado(Python) 和 libev(C)等。

Golang Runtime 封裝了各作業系統平台上的多路IO複用接口, 并允許使用 goroutine 快速開發高性能的 tcp 伺服器。

Echo 伺服器

作為開始,我們來實作一個簡單的 Echo 伺服器。它會接受用戶端連接配接并将用戶端發送的内容原樣傳回用戶端。

package main

import (
    "fmt"
    "net"
    "io"
    "log"
    "bufio"
)

func ListenAndServe(address string) {
    // 綁定監聽位址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的連接配接建立或者listen中斷才會傳回
        conn, err := listener.Accept()
        if err != nil {
            // 通常是由于listener被關閉無法繼續監聽導緻的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啟新的 goroutine 處理該連接配接
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    // 使用 bufio 标準庫提供的緩沖區功能
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 '\n'
        // 遇到分隔符後會傳回上次遇到分隔符或連接配接建立後收到的所有資料, 包括分隔符本身
        // 若在遇到分隔符之前遇到異常, ReadString 會傳回已收到的資料和錯誤資訊
        msg, err := reader.ReadString('\n')
        if err != nil {
            // 通常遇到的錯誤是連接配接中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 将收到的資訊發送給用戶端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}
           

使用 telnet 工具測試我們編寫的 Echo 伺服器:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
> a
a
> b
b
Connection closed by foreign host.
           

拆包與粘包

HTTP 等應用層協定隻有收到一條完整的消息後才能進行處理,而工作在傳輸層的TCP協定并不了解應用層消息的結構。

是以,可能遇到一條應用層消息分為兩個TCP包發送或者一個TCP包中含有兩條應用層消息片段的情況,前者稱為拆包後者稱為粘包。

在 Echo 伺服器的示例中,我們定義用

\n

表示消息結束。我們可能遇到下列幾種情況:

  1. 收到兩個 tcp 包: "abc", "def\n", 應發出一條響應 "abcdef\n", 這是拆包的情況
  2. 收到一個 tcp 包: "abc\ndef\n", 應發出兩條響應 "abc\n", "def\n", 這是粘包的情況

當我們使用 tcp socket 開發應用層程式時必須正确處理拆包和粘包。

bufio 标準庫會緩存收到的資料直到遇到分隔符才會傳回,它可以正确處理拆包和粘包。

上層協定通常采用下列幾種思路之一來定義消息,以保證完整地進行讀取:

  • 定長消息
  • 在消息尾部添加特殊分隔符,如示例中的Echo協定和FTP控制協定
  • 将消息分為header 和 body, 并在 header 提供消息總長度。這是應用最廣泛的政策,如HTTP協定。

優雅關閉

在生産環境下需要保證TCP伺服器關閉前完成必要的清理工作,包括将完成正在進行的資料傳輸,關閉TCP連接配接等。這種關閉模式稱為優雅關閉,可以避免資源洩露以及用戶端未收到完整資料造成異常。

TCP 伺服器的優雅關閉模式通常為: 先關閉listener阻止新連接配接進入,然後周遊所有連接配接逐個進行關閉。

本節完整源代碼位址: https://github.com/HDT3213/godis/tree/master/src/server

首先修改一下TCP伺服器:

// handler 是應用層伺服器的抽象
type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close()error
}

func ListenAndServe(cfg *Config, handler tcp.Handler) {
    listener, err := net.Listen("tcp", cfg.Address)
    if err != nil {
        logger.Fatal(fmt.Sprintf("listen err: %v", err))
    }

    // 監聽中斷信号
    // atomic.AtomicBool 是作者寫的封裝: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go
    var closing atomic.AtomicBool 
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
        sig := <-sigCh
        switch sig {
        case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
            // 收到中斷信号後開始關閉流程
            logger.Info("shuting down...")
            // 設定标志位為關閉中, 使用原子操作保證線程可見性
            closing.Set(true)
            // listener 關閉後 listener.Accept() 會立即傳回錯誤
            listener.Close() 
        }
    }()


    logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
    // 在出現未知錯誤或panic後保證正常關閉
    // 注意defer順序,先關閉 listener 再關閉應用層伺服器 handler
    defer handler.Close()
    defer listener.Close()
    ctx, _ := context.WithCancel(context.Background())
    for {
        conn, err := listener.Accept()
        if err != nil {
            if closing.Get() {
                // 收到關閉信号後進入此流程,此時listener已被監聽系統信号的 goroutine 關閉
                // handler 會被上文的 defer 語句關閉直接傳回
                return 
            }
            logger.Error(fmt.Sprintf("accept err: %v", err))
            continue
        }
        // handle
        logger.Info("accept link")
        go handler.Handle(ctx, conn)
    }
}
           

接下來修改應用層伺服器:

// 用戶端連接配接的抽象
type Client struct {
    // tcp 連接配接
    Conn net.Conn
    // 當服務端開始發送資料時進入waiting, 阻止其它goroutine關閉連接配接
    // wait.Wait是作者編寫的帶有最大等待時間的封裝: 
    // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
    Waiting wait.Wait
}

type EchoHandler struct {
    
    // 儲存所有工作狀态client的集合(把map當set用)
    // 需使用并發安全的容器
    activeConn sync.Map 

    // 和 tcp server 中作用相同的關閉狀态辨別位
    closing atomic.AtomicBool
}

func MakeEchoHandler()(*EchoHandler) {
    return &EchoHandler{
    }
}

// 關閉用戶端連接配接
func (c *Client)Close()error {
    // 等待資料發送完成或逾時
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
}

func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // closing handler refuse new connection
        conn.Close()
    }

    client := &Client {
        Conn: conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    for {
        msg, err := reader.ReadString('\n')
        if err != nil {
            if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(conn)
            } else {
                logger.Warn(err)
            }
            return
        }
        // 發送資料前先置為waiting狀态
        client.Waiting.Add(1)

        // 模拟關閉時未完成發送的情況
        //logger.Info("sleeping")
        //time.Sleep(10 * time.Second)

        b := []byte(msg)
        conn.Write(b)
        // 發送完畢, 結束waiting
        client.Waiting.Done()
    }
}

func (h *EchoHandler)Close()error {
    logger.Info("handler shuting down...")
    h.closing.Set(true)
    // TODO: concurrent wait
    h.activeConn.Range(func(key interface{}, val interface{})bool {
        client := key.(*Client)
        client.Close()
        return true
    })
    return nil
}
           

轉載于:https://www.cnblogs.com/Finley/p/11070669.html

繼續閱讀