天天看點

使用golang寫一個redis-cli

0. redis通信協定

redis的用戶端(redis-cli)和服務端(redis-server)的通信是建立在tcp連接配接之上, 兩者之間資料傳輸的編碼解碼方式就是所謂的redis通信協定。是以,隻要我們的redis-cli實作了這個協定的解析和編碼,那麼我們就可以完成所有的redis操作。

redis 協定設計的非常易讀,也易于實作,關于具體的redis通信協定請參考:

通信協定(protocol)

。後面我們在實作這個協定的過程中也會簡單重複介紹一下具體實作

1. 建立tcp連接配接

redis用戶端和服務端的通信是建立tcp連接配接之上,是以第一步自然是先建立連接配接

package main

import (
	"flag"
	"log"
	"net"
)

var host string
var port string

func init() {
	flag.StringVar(&host, "h", "localhost", "hsot")
	flag.StringVar(&port, "p", "6379", "port")
}

func main() {
	flag.Parse()

	tcpAddr := &net.TCPAddr{IP: net.ParseIP(host), Port: port}
	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		log.Println(err)
    }
    defer conn.Close()

	// to be continue
}           

後續我們發送和接受資料便都可以使用conn.Read()和conn.Write()來進行了

2. 發送請求

發送請求第一個第一個位元組是"*",中間是包含指令本身的參數個數,後面跟着"\r\n" 。之後使用"$"加參數位元組數量并使用"\r\n"結尾,然後緊跟參數内容同時也使用"\r\n"結尾。如執行 SET key liangwt 用戶端發送的請求為"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$7\r\nliangwt\r\n"

注意:

  1. 指令本身也作為協定的其中一個參數來發送
  2. \r\n 對應byte的十進制為 13 10

我們可以使用telnet測試下

wentao@bj:~/github.com/liangwt/redis-cli$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
*3
$3
SET
$3
key
$7
liangwt
+OK           

先暫時忽略服務端的回複,通過telnet我們可以看出請求協定非常簡單,是以對于請求協定的實作不做過多的介紹了,直接放代碼(如下使用基于字元串拼接,隻是為了更直覺的示範,效率并不高,實際代碼中我們使用bytes.Buffer來實作)

func MultiBulkMarshal(args ...string) string {
	var s string
	s = "*"
	s += strconv.Itoa(len(args))
	s += "\r\n"

	// 指令所有參數
	for _, v := range args {
		s += "$"
		s += strconv.Itoa(len(v))
		s += "\r\n"
		s += v
		s += "\r\n"
	}

	return s
}           

在實作了對指令和參數進行編碼之後,我們便可以通過conn.Write()把資料推送到服務端

func main() {
    // ....
	req := MultiBulkMarshal("SET", "key", "liangwt")
	_, err = conn.Write([]byte(req))
	if err != nil {
		log.Fatal(err)
	}
	// to be continue
}           

3. 擷取回複

我們首先實作通過tcp擷取服務端傳回值,就是上面提到過的conn.Read()。

func main() {
    // ....
	p := make([]byte, 1024)
	_, err = conn.Read(p)
	if err != nil {
		log.Fatal(err)
	}
	// to be continue
}           

4. 解析回複

我們拿到p之後我們就可以解析傳回值了,redis服務端的回複是分為幾種情況的

  • 狀态回複
  • 錯誤回複
  • 整數回複
  • 批量回複
  • 多條批量回複

我們把前四種單獨看作一組,因為他們都是單一類型的傳回值

我們把最後的多條批量回複看成單獨的一組,因為它是包含前面幾種類型的混合類型。而且你可以發現它和我們的請求協定是一樣的

也正是基于以上的考慮我們建立兩個函數來分别解析單一類型和混合類型,這樣在解析混合類型中的某一類型時就隻需要調用單一類型解析的函數即可

在解析具體協定前我們先實作一個是讀取到\r\n為止的函數

func ReadLine(p []byte) ([]byte, error) {
	for i := 0; i < len(p); i++ {
		if p[i] == '\r' {
			if p[i+1] != '\n' {
				return []byte{}, errors.New("format error")
			}
			return p[0:i], nil
		}
	}
	return []byte{}, errors.New("format error")
}           

第一種狀态回複:

狀态回複是一段以 "+" 開始, "\r\n" 結尾的單行字元串。如 SET 指令成功的傳回值:"+OK\r\n"

是以我們判斷第一個字元是否等于 '+' 如果相等,則讀取到\r\n

func SingleUnMarshal(p []byte) ([]byte, int, error) {
	var (
		result []byte
		err    error
		length int
	)
	switch p[0] {
	case '+':
		result, err = ReadLine(p[1:])
		length = len(result) + 3
	}

	return result, length, err
}           

注:我們在傳回實際回複内容的同時也傳回了整個回複的長度,友善後面解析多條批量回複時定位下一次的解析位置

第二種錯誤回複:

錯誤回複的第一個位元組是 "-", "\r\n" 結尾的單行字元串。如執行 SET key缺少參數時傳回值:"-ERR wrong number of arguments for 'set' command\r\n"

錯誤回複和狀态回複非常相似,解析方式也是一樣到。是以我們隻需添加一個case即可

func SingleUnMarshal(p []byte) ([]byte, int, error) {
	var (
		result []byte
		err    error
		length int
	)
	switch p[0] {
	case '+', '-':
		result, err = ReadLine(p[1:])
		length = len(result) + 3
	}
	return result, length, err
}           

第三種整數回複:

整數回複的第一個位元組是":",中間是字元串表示的整數,"\r\n" 結尾的單行字元串。如執行LLEN mylist指令時傳回 ":10\r\n"

整數回複也和上面兩種是一樣的,隻不過傳回的是字元串表示的十進制整數

func SingleUnMarshal(p []byte) ([]byte, int, error) {
	var (
		result []byte
		err    error
		length int
	)
	switch p[0] {
	case '+', '-', ':':
		result, err = ReadLine(p[1:])
		length = len(result) + 3
	}
	return result, length, err
}           

第四種批量回複:

批量回複的第一個位元組為 "$",接下來是字元串表示的整數,它表示實際回複的長度,之後跟着一個 "\r\n",再後面跟着的是實際回複資料,最末尾是另一個 "\r\n"。如GET key 指令的傳回值:"$7\r\nliangwt\r\n"

是以批量回複解析的實作:

  • 讀取第一行得到實際回複的長度
  • 把字元串類型的長度轉換成對應十進制整數
  • 從第二行開始位置往下讀對應長度

但是對于某些不存在的key,批量回複會将特殊值 -1 用作回複的長度值, 此時我們不需要繼續往下讀取實際回複。例如GET NOT_EXIST_KEY 傳回值:"$-1", 是以我們需要對此特殊情況判斷,讓函數傳回一個空對象(nil)而不是空值("")

func SingleUnMarshal(p []byte) ([]byte, int, error) {
	// ....
	case '$':
		n, err := ReadLine(p[1:])
		if err != nil {
			return []byte{}, 0, err
		}
		l, err := strconv.Atoi(string(n))
		if err != nil {
			return []byte{}, 0, err
		}
		if l == -1 {
			return nil, 0, nil
		}
		// +3 的原因 $ \r \n 三個字元
		result = p[len(n)+3 : len(n)+3+l]
		length = len(n) + 5 + l
	}
	return result, length, err
}           

思考:

為什麼redis要使用提前告知位元組數,然後往下讀取指定長度的方式,而不是直接讀取第二行到\r\n為止?

答案很明顯:此方式可以讓redis讀取傳回值時不受具體的傳回内容影響,在按行讀取的情況下,無論使用任何分割符都有可能導緻redis在解析具體内容時把内容中的分割符當作時結尾,導緻解析錯誤。

思考一下這種情況:我們SET key "liang\r\nwt" ,那麼當我們GET key時,服務端傳回值為"$9\r\nliang\r\nwt\r\n" 完全規避了value中的\r\n影響

第五種多條批量回複:

多條批量回複是由多個回複組成的數組,它的第一個位元組為"*", 後跟一個字元串表示的整數值, 這個值記錄了多條批量回複所包含的回複數量, 再後面是一個"\r\n"。如LRANGE mylist 0 -1的傳回值:"*3\r\n$1\r\n3\r\n$1\r\n2\r\n$1\r\n1"。

是以多條批量回複解析的實作:

  • 解析第一行資料獲得字元串類型的回複數量
  • 按照單條回複依次逐個解析,一共解析成上面得到的數量

在這裡我們用到了單條解析時傳回的位元組長度length,通過這個長度我們可以很友善的知道下次單條解析的開始位置為上一次位置+length

在解析多條批量回複時需要注意兩點:

第一,多條批量回複也可以是空白的(empty)。例如執行LRANGE NOT_EXIST_KEY 0 -1 服務端傳回值"*0\r\n"。此時用戶端傳回的應該空數組[][]byte

第二,多條批量回複也可以是無内容的(null multi bulk reply)。例如執行BLPOP key 1 服務端傳回值"*-1\r\n"。此時用戶端傳回的應該是nil

func MultiUnMarsh(p []byte) ([][]byte, error) {
	if p[0] != '*' {
		return [][]byte{}, errors.New("format error")
	}
	n, err := ReadLine(p[1:])
	if err != nil {
		return [][]byte{}, err
	}
	l, err := strconv.Atoi(string(n))
	if err != nil {
		return [][]byte{}, err
	}
	// 多條批量回複也可以是空白的(empty)
	if l == 0 {
		return [][]byte{}, nil
	}

	// 無内容的多條批量回複(null multi bulk reply)也是存在的,
	// 用戶端庫應該傳回一個 null 對象, 而不是一個空數組。
	if l == -1 {
		return nil, nil
	}
	result := make([][]byte, l)
	t := len(n) + 3
	for i := 0; i < l; i++ {
		ret, length, err := SingleUnMarshal(p[t:])
		if err != nil {
			return [][]byte{}, errors.New("format error")
		}
		result[i] = ret
		t += length
	}

	return result, nil
}           

5. 指令行模式

一個可用的redis-cli自然是一個互動式的,使用者輸入指令然後輸出傳回值。在go中我們可以使用以下代碼即可獲得一個類似的互動式指令行

func main() {
	// ....
	for {
		fmt.Printf("%s:%d>", host, port)

		bio := bufio.NewReader(os.Stdin)
		input, _, err := bio.ReadLine()
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("%s\n", input)
	}
}           

我們運作以上代碼就可以實作

localhost:6379>set key liang
set key liang
localhost:6379>get key
get key
localhost:6379>           

結合上我們的redis發送請求和解析請求即可完成整個redis-cli

func main() {
    // ....
	for {
		fmt.Printf("%s:%d>", host, port)

		// 擷取輸入指令和參數
		bio := bufio.NewReader(os.Stdin)
		input, err := bio.ReadString('\n')
		if err != nil {
			log.Fatal(err)
		}
		fields := strings.Fields(input)

		// 編碼發送請求
		req := MultiBulkMarshal(fields...)

		// 發送請求
		_, err = conn.Write([]byte(req))
		if err != nil {
			log.Fatal(err)
		}

		// 讀取傳回内容
		p := make([]byte, 1024)
		_, err = conn.Read(p)
		if err != nil {
			log.Fatal(err)
		}

		// 解析傳回内容
		if p[0] == '*' {
			result, err := MultiUnMarsh(p)
		} else {
			result, _, err := SingleUnMarshal(p)
		}

    }
    // ....
}           

6. 總結

到目前為止我們的cli程式已經全部完成,但其實還有很多不完美地方。但核心的redis協定解析已經完成,使用這個解析我們能完成任何的cli與伺服器之間的互動

本文來自雲栖社群合作夥伴“開源中國”

本文作者: liangwt 

原文連結