天天看點

從0開始,手寫Redis

作者:架構師尼恩

▌說在前面:

從0開始,手寫一個Redis的學習價值在于:

  • 可以深入地了解Redis的内部機制和原理,Redis可謂是面試的絕對重點和難點
  • 進而更好地掌握Redis的使用和優化
  • 幫助你提高程式設計能力和解決問題的能力
  • 手寫一個Redis可以作為一個優質的履歷輪子項目,注意,是高品質的履歷輪子項目

很多小夥伴的項目都很low,極度缺乏輪子項目,so,輪子項目這就來了。

尼恩從架構師視角出發,基于尼恩 3高架構知識宇宙,寫一本《從0開始,手寫Redis》PDF。

後面會不斷更新,不斷 疊代, 變成大家學習和面試的必讀書籍。

《尼恩 架構筆記》《尼恩高并發三部曲》《尼恩Java面試寶典》的PDF,請到公号【技術自由圈】取

▌本文目錄:

- 說在前面
- 作者介紹
- 實作 Redis 協定解析器
  - 1、Redis網絡協定詳解
    - 1.1 正常回複
    - 1.2 錯誤回複
    - 1.3 整數
    - 1.4 多行字元串
    - 1.5 數組
  - 2、實作constreply
    - 2.1 Connection接口
    - 2.2 Reply接口
    - 2.3 正常的常量回複
      - 2.3.1 PongReply
      - 2.3.2 OKReply
      - 2.3.3 NullBulkReply
      - 2.3.4 EmptyMultiBulkReply
      - 2.3.5 NoReply
    - 2.4 錯誤的常量回複
      - 2.4.1 UnknownErrReply
      - 2.4.2 ArgNumErrReply
      - 2.4.3 SyntaxErrReply
      - 2.4.4 WrongTypeErrReply
      - 2.4.5 ProtocolErrReply
    - 2.5 實作自定義Reply
      - 2.5.1 CRLF
      - 2.5.2 BulkReply
      - 2.5.3 MultiBulkReply
      - 2.5.4 StatusReply
      - 2.5.4 IntReply
      - 2.5.4 StandardErrReply
  - 3、實作ParseStream
    - 3.1 Payload
    - 3.2 readState
    - 3.3 finished()
    - 3.4 ParseStream()
    - 3.5 readLine()
    - 3.6 parseMultiBulkHeader
    - 3.7 parseBulkHeader
    - 3.8 parseSingleLineReply
    - 3.9 readBody
    - 3.10 parse0()
  - 4、實作Connection
    - 4.1 Connection結構體
    - 4.2 close方法
    - 4.3 RemoteAddr
    - 4.4 Write
  - 5、實作RespHandler
    - 5.1 定義Database 接口
    - 5.2 RespHandler結構體
      - 5.2.1 RespHandler
      - 5.2.2 Close()方法
      - 5.2.3 closeClient方法
      - 5.2.4 實作Handle方法
  - 6、測試
    - 6.1 建立一個EchoDatabase,它實作了database.Database接口:
    - 6.2 定義方法MakeHandler
- 實作記憶體資料庫
  - 1、定義Dict接口
  - 2、為Dict接口寫一個實作
    - 2.1 SyncDict結構體
    - 2.2 Get方法
    - 2.3 Put方法
    - 2.4 Len方法
    - 2.5 PutIfAbsent方法
    - 2.6 PutIfExists方法
    - 2.7 Remove方法
    - 2.8 ForEach方法
    - 2.9 Keys方法
    - 2.10 RandomKeys方法
    - 2.11 RandomDistinctKeysKeys方法
  - 3、DB結構體
    - 3.1 DB結構體
    - 3.2 ExecFunc接口
    - 3.3 Exec方法
    - 3.4 GetEntity方法
    - 3.5 PutEntity方法
    - 3.6 PutIfExists方法
    - 3.6 PutIfAbsent方法
  - 4、command結構體
  - 5、實作ping指令
  - 6、實作KEY指令
    - 6.1 execDel方法
    - 6.2 execExists方法
    - 6.3 execFlushDB方法
    - 6.4 execType方法
    - 6.5 execRename方法
    - 6.6 execRenameNx方法
    - 6.7 execKeys方法
    - 6.8 初始化方法
  - 7、實作string指令
    - 7.1 getAsString方法
    - 7.2 execGet方法
    - 7.3 execSet方法
    - 7.4 execSetNX方法
    - 7.5 execGetSet方法
    - 7.6 execStrLen方法
    - 7.7 init方法
  - 8、Database
    - 8.1 NewDatabase
    - 8.2 Exec方法
    - 8.3 execSelect方法
- GO實作Redis持久化
  - 1、AofHandler
    - 1.1 AofHandler結構體
    - 1.2 實作AddAof
    - 1.3 實作handleAof
  - 2、實作Aof落盤功能
    - 2.1 Database結構體
    - 2.2 改造keys指令
    - 2.3 改造strings指令
  - 3、實作Aof恢複
    - 實作LoadAof方法
- 說在後面
- 11個相關技術 PDF           

▌作者介紹:

一作:Khan, 架構師,十年背景開發經驗 精通spring、rocketmq、多線程等架構和中間件源碼。

二作: 尼恩,資深系統架構師、IT領域資深作家、著名部落客。近20年高性能Web平台、高性能通信、高性能搜尋、資料挖掘架構經驗。資深的轉架構導師,成功指導了N多個小夥更新架構師,有的小夥拿到年薪90W。

▌實作 Redis 協定解析器:

首先,從0開始,實作協定解析器。

▌1、Redis網絡協定詳解

那實戰之前,需要先搞清楚什麼叫red的協定。 Redis用的什麼協定?所謂的redis的協定就叫resp協定,它的全稱叫REdis Serialization Portocol(RESP),就是redis的序列化協定,也就是reids的用戶端和服務端通信的協定。即通過TCP連接配接,發送什麼樣的資料才能代表redis的通信。

RESP協定有五種資料的格式

  1. 正常回複
  2. 錯誤回複
  3. 整數
  4. 多行字元串
  5. 數組

▌1.1 正常回複

正常回複,是以+開頭,以\r\n結尾,大家都知道\r\n是CRLF對,是Windows系統常用的這個換行符,那麼redis這裡呢,也借鑒了Windows的這個換行符。 舉個例子,比如說我們對redis發出一個指令,redis會回給我們一個正常。

正常就是OK,那他回給我們OK怎麼回呢?

就是:

+OK\r\n

+表示這是一個正常回複,OK是正常回複的内容,\r\n是結束。

▌1.2 錯誤回複

錯誤回複是以-開頭,以\r\n結尾的字元串形式,就是你發送的這個指令不對有問題。 redis會給你回複一個錯誤

例如:

-Error message\r\n

-表示這是一個錯誤回複,message是錯誤回複的内容,\r\n是結束。

▌1.3 整數

這個就不是單單是redis 服務端回複給用戶端的消息了,是用戶端和redis 服務端互相通信,用戶端要發送一個整數的話,那就用這麼一個格式,是:開頭寫整數,然後以\r\n結束。

例如:

:123456\r\n

▌1.4 多行字元串

多行字元串是以$開頭,後面跟位元組數以杠\r\n結尾

比如,如果要發送hello world,那麼:

$11\r\nhello world\r\n

如果是空字元串,就是:

$0\r\n\r\n

如果字元串裡面本來就有\r\n,比如hello\r\nworld那麼:

$14\r\nhello\r\nworld\r\n

▌1.5 數組

數組是以*開頭,後面跟成員的個數

比如set key value這個字元串

TCP網絡封包則為以下樣式:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

*後面的3表示有三個成員,$3表示set是三個字元,下一個$3表示key是三個字元,$5表示value是五個字元。

▌2、實作constreply

▌2.1 Connection接口

Connection是在redis的協定層,它代表着一個redis的連接配接,之是以要寫接口,因為未來Connection會有不同的實作,它跟持久化是相關的。它有什麼方法呢?

一個是GetDBIndex方法,它是什麼意思呢?大家知道redis通常它會有DB的概念,比如說它預設有16個DB,就是說每一個DB之間的KV是隔離開的。用戶端連接配接查詢的時候需要知道現在用的是哪個DB,就需要這個方法。

還有SelectDB方法,這個指令是切DB庫,Redis的核心可能有16個庫,甚至更多,那如果客戶想在各個這個庫之間去切換,其實就是切換 這個客戶他的辨別。

type Connection interface {
   GetDBIndex() int
   SelectDB(int)
}           

▌2.2 Reply接口

這個接口主要是來代表一類資料,這一類資料是各種服務端對用戶端的回複,叫Reply,都可以用這個接口來表示,是以就有很多的實作,它必須要實作一個方法叫ToBytes,就是把我們的回複的内容轉成位元組。之是以要把回複的内容轉成位元組,是因為TCP協定來回寫的就是寫位元組流。

type Reply interface {
   ToBytes() []byte
}           

▌2.3 正常的常量回複

建立一個consts.go檔案,consts.go檔案用于儲存一些固定的回複

◆2.3.1 PongReply

Redis有一個ping指令,用戶端發ping,服務端就要回複pong,建立一個PongReply實作Reply接口:

/**
按照RESP協定的約定,正常回複以"+"開頭,以"\r\n"結尾
 */
var pongbytes = []byte("+PONG\r\n")

func (r PongReply) ToBytes() []byte {
   return pongbytes;
}           

寫一個make方法,友善調用:

func MakePongReply() *PongReply {
   return &PongReply{}
}           

◆2.3.2 OKReply

同理,再寫一個回複OK的Reply:

// OkReply is +OK
type OkReply struct{}

var okBytes = []byte("+OK\r\n")

// ToBytes marshal redis.Reply
func (r *OkReply) ToBytes() []byte {
  return okBytes
}
var theOkReply = new(OkReply)

// MakeOkReply returns a ok protocol
func MakeOkReply() *OkReply {
  return theOkReply
}           

◆2.3.3 NullBulkReply

這是一個空的塊回複:

var nullBulkBytes = []byte("$-1\r\n")

// NullBulkReply is empty string
type NullBulkReply struct{}

// ToBytes marshal redis.Reply
func (r *NullBulkReply) ToBytes() []byte {
  return nullBulkBytes
}

// MakeNullBulkReply creates a new NullBulkReply
func MakeNullBulkReply() *NullBulkReply {
  return &NullBulkReply{}
}           

◆2.3.4 EmptyMultiBulkReply

空數組回複:

var emptyMultiBulkBytes = []byte("*0\r\n")

// EmptyMultiBulkReply is a empty list
type EmptyMultiBulkReply struct{}

// ToBytes marshal redis.Reply
func (r *EmptyMultiBulkReply) ToBytes() []byte {
  return emptyMultiBulkBytes
}

// MakeEmptyMultiBulkReply creates EmptyMultiBulkReply
func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {
  return &EmptyMultiBulkReply{}
}           

◆2.3.5 NoReply

空回複:

type NoReply struct{}

var noBytes = []byte("")

// ToBytes marshal redis.Reply
func (r *NoReply) ToBytes() []byte {
  return noBytes
}           

▌2.4 錯誤的常量回複

建立一個reply.go檔案,在reply.go檔案為所有錯誤回複類型寫一個接口:

type ErrorReply interface {
  Error() string
  ToBytes() []byte
}           

這一個接口實作了兩個接口:實作了系統的builtin.go下的error接口以及resp\reply.go下的Reply接口,是以這是一個把兩個接口縫合在一起的接口。

建立一個error.go,裡面全是各種錯誤回複

◆2.4.1 UnknownErrReply

未知錯誤:

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

// ToBytes marshals redis.Reply
func (r *UnknownErrReply) ToBytes() []byte {
  return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
  return "Err unknown"
}           

◆2.4.2 ArgNumErrReply

參數個數錯誤:

/*
*
Cmd用于将用戶端傳過來的指令回複過去
*/ 
type ArgNumErrReply struct {
  Cmd string
}

// ToBytes marshals redis.Reply
func (r *ArgNumErrReply) ToBytes() []byte {
  return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
  return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

// MakeArgNumErrReply represents wrong number of arguments for command
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
  return &ArgNumErrReply{
    Cmd: cmd,
  }
}           

◆2.4.3 SyntaxErrReply

文法錯誤:

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

// MakeSyntaxErrReply creates syntax error
func MakeSyntaxErrReply() *SyntaxErrReply {
  return theSyntaxErrReply
}

// ToBytes marshals redis.Reply
func (r *SyntaxErrReply) ToBytes() []byte {
  return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
  return "Err syntax error"
}           

◆2.4.4 WrongTypeErrReply

資料類型錯誤:

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

// ToBytes marshals redis.Reply
func (r *WrongTypeErrReply) ToBytes() []byte {
  return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
  return "WRONGTYPE Operation against a key holding the wrong kind of value"
}           

◆2.4.5 ProtocolErrReply

接口協定錯誤,用戶端發過來的指令不符合RESP規範時的回複:

type ProtocolErrReply struct {
  Msg string
}

// ToBytes marshals redis.Reply
func (r *ProtocolErrReply) ToBytes() []byte {
  return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
  return "ERR Protocol error '" + r.Msg + "' command"
}           

▌2.5 實作自定義Reply

◆2.5.1 CRLF

定義一個變量CRLF,用于表示Redis序列化協定中的行分隔符。Redis是一個記憶體資料庫,使用自定義的序列化協定進行資料傳輸和存儲。在Redis序列化協定中,每個資料結構都以某種特定的格式進行序列化,并且使用\r\n作為行分隔符

var (
  CRLF = "\r\n"
)           

◆2.5.2 BulkReply

BulkReply是用來定義對字元串的回複的。

type BulkReply struct {
  Arg []byte
}           

BulkReply要實作Reply接口就要重寫ToBytes()方法,比如,如果輸出是"hello world",根據RESP協定,就要轉換為"$11\r\nhello world\r\n",strconv.Itoa(len(r.Arg))把參數的長度轉化為字元串,是以ToBytes()的最終實作為:

func (r *BulkReply) ToBytes() []byte {
  if r.Arg == nil {
    return nullBulkBytes
  }
  return []byte("#34; + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}           

◆2.5.3 MultiBulkReply

數組的回複,注意參數Args [][]byte是個二維數組:

type MultiBulkReply struct {
  Args [][]byte
}

// MakeMultiBulkReply creates MultiBulkReply
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
  return &MultiBulkReply{
    Args: args,
  }
}

// ToBytes marshal redis.Reply
func (r *MultiBulkReply) ToBytes() []byte {
  argLen := len(r.Args)
  var buf bytes.Buffer
  buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
  for _, arg := range r.Args {
    if arg == nil {
      buf.WriteString("$-1" + CRLF)
    } else {
      buf.WriteString("#34; + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
    }
  }
  return buf.Bytes()
}           

上面這段代碼定義了一個MultiBulkReply類型的方法ToBytes(),用于将MultiBulkReply類型的對象轉換為位元組數組([]byte類型)。

在Redis協定中,批量回複的資料類型由一個以*開頭的整數表示,後跟一組以$開頭的資料塊。每個資料塊都由一個以$開頭的長度表示,後跟實際資料和一個行分隔符組成。

在ToBytes()方法中,首先擷取MultiBulkReply類型對象中參數的數量argLen。然後,使用bytes.Buffer類型的變量buf建立一個緩沖區,并将*和參數數量寫入緩沖區。

接着,周遊MultiBulkReply類型對象中的所有參數。如果參數為nil,則将$-1寫入緩沖區,表示該參數為空。否則,将參數的長度和資料塊寫入緩沖區。

最後,将緩沖區轉換為位元組數組并傳回。該方法傳回的位元組數組可以用于網絡傳輸或持久化存儲等操作。

◆2.5.4 StatusReply

狀态回複:

type StatusReply struct {
  Status string
}

// MakeStatusReply creates StatusReply
func MakeStatusReply(status string) *StatusReply {
  return &StatusReply{
    Status: status,
  }
}

// ToBytes marshal redis.Reply
func (r *StatusReply) ToBytes() []byte {
  return []byte("+" + r.Status + CRLF)
}           

◆2.5.4 IntReply

數字回複:

type IntReply struct {
  Code int64
}

// MakeIntReply creates int protocol
func MakeIntReply(code int64) *IntReply {
  return &IntReply{
    Code: code,
  }
}

// ToBytes marshal redis.Reply
func (r *IntReply) ToBytes() []byte {
  return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}           

◆2.5.4 StandardErrReply

标準錯誤回複:

// StandardErrReply represents server error
type StandardErrReply struct {
  Status string
}

// MakeErrReply creates StandardErrReply
func MakeErrReply(status string) *StandardErrReply {
  return &StandardErrReply{
    Status: status,
  }
}


// ToBytes marshal redis.Reply
func (r *StandardErrReply) ToBytes() []byte {
  return []byte("-" + r.Status + CRLF)
}

func (r *StandardErrReply) Error() string {
  return r.Status
}           

判斷是不是錯誤,根據第一個位元組是不是"-":

func IsErrorReply(reply resp.Reply) bool {
  return reply.ToBytes()[0] == '-'
}           

▌3、實作ParseStream

用戶端發給服務端的二進制位元組流,裡邊有*、$以及\r\n,這一節,我們就來把它們解析成真正的含義。

建立一個parser.go用于處理位元組流的解析

▌3.1 Payload

定義一個Payload類型,用于表示Redis指令的傳回結果。

Payload類型包含兩個字段:Data和Err。Data字段的類型是resp.Reply,表示Redis指令的傳回結果。resp包提供了Redis協定的解析和序列化功能,Reply類型是解析後的Redis指令傳回結果的資料類型之一。Err字段的類型是error,表示Redis指令執行過程中可能出現的錯誤。

使用Payload類型可以友善地封裝Redis指令的傳回結果,并進行傳遞和處理。在使用Payload類型時,可以通過檢查Err字段來判斷Redis指令是否執行成功,并通過Data字段擷取Redis指令的傳回結果。

type Payload struct {
  Data resp.Reply
  Err  error
}           

▌3.2 readState

定義一個readState類型,用于解析Redis協定中的讀取狀态。

readState類型包含以下字段:

  • readingMultiLine表示是否正在讀取多行資料。
  • expectedArgsCount表示預期讀取的參數數量。
  • msgType表示目前讀取的消息類型。
  • args表示已經讀取的參數清單。
  • bulkLen表示目前正在讀取的塊資料的長度。

使用readState類型可以友善地記錄Redis協定解析的狀态,并在解析完整個協定時傳回解析結果。在解析Redis指令時,可以通過讀取狀态來判斷目前正在讀取的協定資料類型,并逐漸解析出完整的Redis指令和參數清單。

type readState struct {
  readingMultiLine  bool
  expectedArgsCount int
  msgType           byte
  args              [][]byte
  bulkLen           int64
}           

▌3.3 finished()

定義一個readState類型的方法finished(),用于判斷是否已經完成了Redis協定的解析。

在Redis協定中,每個指令的參數數量是固定的,是以可以通過讀取狀态中的expectedArgsCount字段來判斷是否已經讀取了所有的參數。如果已經讀取了所有的參數,則說明協定解析已經完成。

在finished()方法中,如果expectedArgsCount大于0且已經讀取的參數數量等于expectedArgsCount,則傳回true,表示已經完成了Redis協定的解析。否則,傳回false,表示解析還未完成。

func (s *readState) finished() bool {
  return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}           

▌3.4 ParseStream()

定義一個ParseStream函數,用于解析Redis協定的資料流并傳回一個通道(channel)以供讀取解析結果。

ParseStream函數的第一個參數是一個實作了io.Reader接口的對象,用于從資料流中讀取Redis協定的資料。第二個參數是一個隻讀通道,用于向調用者傳回解析結果。

在ParseStream函數中,首先建立一個緩沖區通道ch,用于在解析過程中存儲解析結果。然後,啟動一個協程parse0來執行解析操作,将解析結果寫入緩沖區通道中。最後,傳回緩沖區通道,供調用者讀取解析結果。

通過使用通道,ParseStream函數可以在解析Redis協定的資料流時,實作異步、非阻塞的解析過程。這種方式可以提高解析效率,避免阻塞和死鎖等問題。

func ParseStream(reader io.Reader) <-chan *Payload {
  ch := make(chan *Payload)
  go parse0(reader, ch)
  return ch
}           

▌3.5 readLine()

定義一個readLine函數,用來讀取一行資料,它有兩個參數:一個是指向 bufio.Reader 的指針,另一個是指向 readState 結構體的指針。該函數傳回三個值:一個位元組切片 msg,一個布爾值表示讀取是否完成,和一個錯誤值。

該函數從 bufio.Reader 中讀取資料,直到遇到換行符('\n')。如果 readState 結構體中的 bulkLen 字段為零,則函數一直讀取,直到遇到換行符為止。如果 bulkLen 字段為非零值,則函數讀取由 bulkLen 值确定的特定位元組數。

如果讀取操作遇到錯誤,函數将傳回一個錯誤值以及部分讀取的資料。如果讀取操作成功,則函數檢查資料是否以回車符('\r')和換行符('\n')結尾。如果不是,則函數傳回一個表示協定錯誤的錯誤值。

如果讀取操作成功并且資料以回車符和換行符結尾,則函數将 bulkLen 字段設定為零,并傳回資料以及一個布爾值,表示讀取操作已完成(false)。

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
  var msg []byte
  var err error
  if state.bulkLen == 0 {
    msg, err = bufReader.ReadBytes('\n')
    if err != nil {
      return nil, true, err
    }
    if len(msg) == 0 || msg[len(msg)-2] != '\r' {
      return nil, false, errors.New("protocol error:" + string(msg))
    }
  } else {
    msg = make([]byte, state.bulkLen+2)
    _, err = io.ReadFull(bufReader, msg)
    if err != nil {
      return nil, true, err
    }
    if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
      return nil, false, errors.New("protocol error:" + string(msg))
    }
    state.bulkLen = 0
  }
  return msg, false, nil
}           

▌3.6 parseMultiBulkHeader

定義parseMultiBulkHeader函數:

首先,該函數接收兩個參數:msg 和 state,其中 msg 是 Redis 傳回的多行字元串的頭部資訊,state 是目前解析狀态的一個結構體,用于存儲解析過程中的相關狀态資訊。

然後,該函數使用 strconv.ParseUint() 方法将頭部資訊中的數字解析成一個無符号整數,存儲在 expectedLine 變量中。如果解析失敗,則傳回一個錯誤。如果解析成功且得到的數字為 0,則表示 Redis 傳回的是一個空的多行字元串,此時将 expectedArgsCount 置為 0,并傳回。如果得到的數字大于 0,則表示 Redis 傳回的是一個非空的多行字元串,此時将 msgType 置為頭部資訊的第一個字元,表示該多行字元串的類型,将 readingMultiLine 置為 true,表示正在讀取多行字元串,将 expectedArgsCount 置為 expectedLine,表示該多行字元串中包含的行數,最後使用 make() 方法建立一個長度為 expectedLine 的空位元組數組切片 args,用于存儲解析得到的每一行字元串的位元組數組。

如果得到的數字小于 0,則表示頭部資訊格式不正确,将傳回一個錯誤。

需要注意的是,在 Redis 協定解析器中,多行字元串是由多個行組成的,每個行都是一個簡單字元串,多行字元串以 $ 符号開頭,後接一個數字,表示該多行字元串中行的數量,然後是多個以 \r\n 分隔的簡單字元串,每個簡單字元串以 $ 符号開頭,後接一個數字,表示該簡單字元串的長度,然後是該簡單字元串的内容。是以,解析多行字元串需要先解析頭部資訊,得到多行字元串中包含的行數,然後逐行解析。

func parseMultiBulkHeader(msg []byte, state *readState) error {
  var err error
  var expectedLine uint64
  expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  if err != nil {
    return errors.New("protocol error: " + string(msg))
  }
  if expectedLine == 0 {
    state.expectedArgsCount = 0
    return nil
  } else if expectedLine > 0 {
    // first line of multi bulk reply
    state.msgType = msg[0]
    state.readingMultiLine = true
    state.expectedArgsCount = int(expectedLine)
    state.args = make([][]byte, 0, expectedLine)
    return nil
  } else {
    return errors.New("protocol error: " + string(msg))
  }
}           

▌3.7 parseBulkHeader

parseBulkHeader用于解析 Redis 傳回的簡單字元串和二進制安全字元串的長度資訊

首先,該函數接收兩個參數:msg 和 state,其中 msg 是 Redis 傳回的簡單字元串或二進制安全字元串的長度資訊,state 是目前解析狀态的一個結構體,用于存儲解析過程中的相關狀态資訊。

然後,該函數使用 strconv.ParseInt() 方法将長度資訊解析成一個有符号整數,存儲在 bulkLen 變量中。如果解析失敗,則傳回一個錯誤。如果解析成功且得到的數字為 -1,則表示 Redis 傳回的是一個空的簡單字元串或二進制安全字元串,此時直接傳回。如果得到的數字大于 0,則表示 Redis 傳回的是一個非空的二進制安全字元串,此時将 msgType 置為長度資訊的第一個字元,表示該字元串的類型,将 readingMultiLine 置為 true,表示正在讀取二進制安全字元串,将 expectedArgsCount 置為 1,表示該字元串隻包含一個元素,最後建立一個長度為 1 的空位元組數組切片 args,用于存儲解析得到的字元串。如果得到的數字小于 0,則表示長度資訊格式不正确,将傳回一個錯誤。

需要注意的是,在 Redis 協定解析器中,簡單字元串和二進制安全字元串的長度資訊以 $ 符号開頭,後接一個數字,表示字元串的長度,然後是字元串的内容。是以,解析簡單字元串和二進制安全字元串需要先解析長度資訊,得到字元串的長度,然後再讀取該長度的位元組數組,即為字元串的内容。

func parseBulkHeader(msg []byte, state *readState) error {
  var err error
  state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  if err != nil {
    return errors.New("protocol error: " + string(msg))
  }
  if state.bulkLen == -1 { // null bulk
    return nil
  } else if state.bulkLen > 0 {
    state.msgType = msg[0]
    state.readingMultiLine = true
    state.expectedArgsCount = 1
    state.args = make([][]byte, 0, 1)
    return nil
  } else {
    return errors.New("protocol error: " + string(msg))
  }
}           

▌3.8 parseSingleLineReply

parseSingleLineReply用于解析 Redis 傳回的單行回複。以下是該函數的詳細解釋:

該函數接收一個位元組數組 msg,該位元組數組是 Redis 傳回的單行回複,函數将該位元組數組轉換為字元串,然後根據字元串的第一個字元,将單行回複的類型分為三種情況:狀态回複、錯誤回複和整數回複。

對于狀态回複,函數使用 strings.TrimSuffix() 方法去掉字元串末尾的 \r\n,然後調用 reply.MakeStatusReply() 方法建立一個狀态回複對象,将該對象作為結果傳回。

對于錯誤回複,函數也是使用 strings.TrimSuffix() 方法去掉字元串末尾的 \r\n,然後調用 reply.MakeErrReply() 方法建立一個錯誤回複對象,将該對象作為結果傳回。

對于整數回複,函數先将字元串轉換為有符号整數,然後調用 reply.MakeIntReply() 方法建立一個整數回複對象,将該對象作為結果傳回。如果轉換失敗,則說明傳回的單行回複格式不正确,函數将傳回一個錯誤。

需要注意的是,在 Redis 協定解析器中,單行回複以一種特定的格式傳回,即以一個特定字元開頭,後接一個字元串,最後以 \r\n 結尾。其中,以 + 開頭的字元串表示狀态回複,以 - 開頭的字元串表示錯誤回複,以 : 開頭的字元串表示整數回複。

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
   str := strings.TrimSuffix(string(msg), "\r\n")
   var result resp.Reply
   switch msg[0] {
   case '+': // status reply
      result = reply.MakeStatusReply(str[1:])
   case '-': // err reply
      result = reply.MakeErrReply(str[1:])
   case ':': // int reply
      val, err := strconv.ParseInt(str[1:], 10, 64)
      if err != nil {
         return nil, errors.New("protocol error: " + string(msg))
      }
      result = reply.MakeIntReply(val)
   }
   return result, nil
}           

▌3.9 readBody

readBody用于讀取 Redis 傳回的多行字元串或二進制安全字元串中的每一行。以下是該函數的詳細解釋:

該函數接收兩個參數:msg 和 state,其中 msg 是 Redis 傳回的多行字元串或二進制安全字元串的一行内容,state 是目前解析狀态的一個結構體,用于存儲解析過程中的相關狀态資訊。

首先,該函數将 msg 中的内容去除末尾的 \r\n,并将結果存儲在 line 變量中。然後,函數判斷 line 的第一個字元是否為 $。如果是,則表示該行是一個二進制安全字元串的長度資訊,該函數使用 strconv.ParseInt() 方法将長度資訊解析成一個有符号整數,存儲在 bulkLen 變量中。如果解析失敗,則傳回一個錯誤。如果解析成功且得到的數字小于等于 0,則表示該行是一個空的二進制安全字元串,在多行字元串中,将空字元串添加到 args 中,并将 bulkLen 置為 0。如果得到的數字大于 0,則表示該行是一個非空的二進制安全字元串,将其添加到 args 中,并将 bulkLen 置為該字元串的長度。

如果 line 的第一個字元不是 $,則表示該行是一個簡單字元串,将其添加到 args 中。

最後,該函數傳回一個空的錯誤。

需要注意的是,在 Redis 協定解析器中,多行字元串是由多個行組成的,每個行都是一個簡單字元串或二進制安全字元串,多行字元串以 $ 符号開頭,後接一個數字,表示該多行字元串中行的數量,然後是多個以 \r\n 分隔的簡單字元串或二進制安全字元串,每個簡單字元串以 $ 符号開頭,後接一個數字,表示該簡單字元串或二進制安全字元串的長度,然後是該簡單字元串或二進制安全字元串的内容。是以,解析多行字元串需要先解析頭部資訊,得到多行字元串中包含的行數,然後逐行解析。

func readBody(msg []byte, state *readState) error {
  line := msg[0 : len(msg)-2]
  var err error
  if line[0] == '#39; {
    // bulk reply
    state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
    if err != nil {
      return errors.New("protocol error: " + string(msg))
    }
    if state.bulkLen <= 0 { // null bulk in multi bulks
      state.args = append(state.args, []byte{})
      state.bulkLen = 0
  }
} else {
  state.args = append(state.args, line)
}
return nil
}           

▌3.10 parse0()

parse0()用于解析 Redis 傳回的資料流并将解析結果發送到一個資料通道。以下是該函數的詳細解釋:

該函數接收兩個參數:reader 和 ch,其中 reader 是一個實作了 io.Reader 接口的對象,用于從 Redis 伺服器讀取資料流,ch 是一個通道,用于發送解析結果。

首先,該函數建立一個帶有緩沖區的讀取器 bufReader,用于從 reader 中讀取資料流。然後建立一個初始狀态 state,用于存儲解析過程中的相關狀态資訊。接着進入一個無限循環,并在每次循環中執行以下操作:

  • 讀取一行資料。該函數調用readLine()方法從 bufReader 中讀取一行資料,并将讀取的結果和錯誤資訊存儲在 msg、ioErr 和 err 變量中。如果讀取成功,則将 msg 傳遞給解析函數進行解析,否則根據 ioErr 判斷是否遇到了 I/O 錯誤,如果是,則關閉資料通道并傳回;否則說明遇到了協定錯誤,将錯誤資訊發送到資料通道并重置解析狀态 state,然後繼續下一輪循環。
  • 解析一行資料。根據讀取到的資料 msg 和解析狀态 state,該函數調用不同的解析函數進行解析。如果 state.readingMultiLine 為 false,則表示正在解析一個新的回複,函數将根據第一個字元判斷回複類型并調用相應的解析函數進行解析。如果 state.readingMultiLine 為 true,則表示正在解析一個多行回複中的一行,函數将調用 readBody() 方法讀取該行的内容并存儲在解析狀态 state 中,如果該多行回複中的所有行都已經讀取完畢,則将解析得到的結果發送到資料通道,并重置解析狀态 state,然後繼續下一輪循環。

需要注意的是,在 Redis 協定解析器中,資料流由多個回複組成,每個回複由多個行組成。多行回複以 $ 符号開頭,後接一個數字,表示該多行回複中行的數量,然後是多個以 \r\n 分隔的簡單字元串或二進制安全字元串。簡單字元串和二進制安全字元串以 $ 符号開頭,後接一個數字,表示該簡單字元串或二進制安全字元串的長度,然後是該簡單字元串或二進制安全字元串的内容。單行回複以一種特定的格式傳回,即以一個特定字元開頭,後接一個字元串,最後以 \r\n 結尾。是以,解析資料流需要先解析頭部資訊,得到回複類型和行數,然後逐行解析。

func parse0(reader io.Reader, ch chan<- *Payload) {
  defer func() {
    if err := recover(); err != nil {
      logger.Error(string(debug.Stack()))
    }
  }()
  bufReader := bufio.NewReader(reader)
  var state readState
  var err error
  var msg []byte
  for {
    // read line
    var ioErr bool
    msg, ioErr, err = readLine(bufReader, &state)
    if err != nil {
      if ioErr { // encounter io err, stop read
        ch <- &Payload{
          Err: err,
        }
        close(ch)
        return
      }
      // protocol err, reset read state
      ch <- &Payload{
        Err: err,
      }
      state = readState{}
      continue
    }

    // parse line
    if !state.readingMultiLine {
      // receive new response
      if msg[0] == '*' {
        // multi bulk reply
        err = parseMultiBulkHeader(msg, &state)
        if err != nil {
          ch <- &Payload{
            Err: errors.New("protocol error: " + string(msg)),
          }
          state = readState{} // reset state
          continue
        }
        if state.expectedArgsCount == 0 {
          ch <- &Payload{
            Data: &reply.EmptyMultiBulkReply{},
          }
          state = readState{} // reset state
          continue
        }
      } else if msg[0] == '#39; { // bulk reply
        err = parseBulkHeader(msg, &state)
        if err != nil {
          ch <- &Payload{
            Err: errors.New("protocol error: " + string(msg)),
          }
          state = readState{} // reset state
          continue
        }
        if state.bulkLen == -1 { // null bulk reply
          ch <- &Payload{
            Data: &reply.NullBulkReply{},
          }
          state = readState{} // reset state
          continue
        }
      } else {
        // single line reply
        result, err := parseSingleLineReply(msg)
        ch <- &Payload{
          Data: result,
            Err:  err,
        }
        state = readState{} // reset state
        continue
      }
    } else {
      // receive following bulk reply
      err = readBody(msg, &state)
      if err != nil {
        ch <- &Payload{
          Err: errors.New("protocol error: " + string(msg)),
        }
        state = readState{} // reset state
        continue
      }
      // if sending finished
      if state.finished() {
        var result resp.Reply
        if state.msgType == '*' {
          result = reply.MakeMultiBulkReply(state.args)
        } else if state.msgType == '#39; {
          result = reply.MakeBulkReply(state.args[0])
        }
        ch <- &Payload{
          Data: result,
            Err:  err,
        }
        state = readState{}
      }
    }
  }
}           

▌4、實作Connection

這一節,我們要寫一個新的Handler讓它處理使用者發過來的請求,把這個請求轉發給解析器,讓解析器去解析。

▌4.1 Connection結構體

這個結構體代表我們的協定層,對每一個連接配接上的用戶端的描述

  • conn類型為net.Conn:這個字段是一個網絡套接字的連接配接。
  • waitingReply類型為wait.Wait:這個字段是一個Wait結構體的執行個體,可能用于同步并發通路連接配接。
  • mu類型為sync.Mutex:這個字段是一個互斥鎖,用于同步對連接配接的通路。
  • selectDB類型為int:這個字段存儲目前標明資料庫的索引。
type Connection struct {
  conn         net.Conn
  waitingReply wait.Wait
  mu           sync.Mutex
  selectDB     int
}           

▌4.2 close方法

close方法的作用是關閉與此連接配接相關聯的網絡連接配接。具體來說,它首先調用waitingReply字段上的WaitWithTimeout()方法,該方法将等待所有正在進行的操作完成,或者在10秒後逾時。然後它調用conn字段上的Close()方法來關閉連接配接。

最後,它傳回nil表示沒有錯誤發生。如果在關閉連接配接時發生錯誤,将傳回一個非nil的error值,以訓示關閉連接配接時出現了問題。

是以,這個Close方法允許使用者以安全的方式關閉連接配接,并等待所有正在進行的操作完成。逾時機制可以確定在某些情況下不會無限期地等待連接配接關閉,進而導緻程式鎖死。

func (c *Connection) Close() error {
  c.waitingReply.WaitWithTimeout(10 * time.Second)
  _ = c.conn.Close()
  return nil
}           

▌4.3 RemoteAddr

方法的作用是傳回與此連接配接相關聯的遠端網絡位址。它通過調用conn字段上的RemoteAddr()方法來實作,該方法也傳回一個net.Addr類型的值。

是以,這個RemoteAddr方法允許使用者擷取連接配接的遠端網絡位址,以便進行網絡程式設計中的相關操作,例如确定連接配接是從哪個主機發起的。

func (c *Connection) RemoteAddr() net.Addr {
  return c.conn.RemoteAddr()
}           

▌4.4 Write

Write是Connection結構體類型的一個方法。這個方法的名稱是Write,它接受一個[]byte類型的參數b,并傳回一個error類型的值。

該方法的作用是向與此連接配接相關聯的網絡連接配接寫入資料。具體來說,它首先檢查b的長度是否為0,如果是,則直接傳回nil表示沒有錯誤發生。否則,它會在調用conn字段上的Write()方法将資料寫入連接配接之前,使用互斥鎖和等待組來同步并發通路連接配接。

在同步代碼塊内部,它将等待組的計數增加1,以表示有一個新的操作正在進行。然後,它使用defer語句在函數傳回時逆序地将等待組的計數減少1,并釋放互斥鎖以允許其他goroutine通路連接配接。

最後,它調用conn字段上的Write()方法來将資料寫入連接配接,并傳回可能發生的錯誤。

是以,這個Write方法允許使用者向連接配接寫入資料,同時確定在并發通路連接配接時是安全的。等待組和互斥鎖可確定多個goroutine不會同時寫入連接配接,并且defer語句可確定在寫操作完成或發生錯誤後正确地釋放資源。

func (c *Connection) Write(b []byte) error {
  if len(b) == 0 {
    return nil
  }
  c.mu.Lock()
  c.waitingReply.Add(1)
  defer func() {
    c.waitingReply.Done()
    c.mu.Unlock()
  }()

  _, err := c.conn.Write(b)
  return err
}           

▌5、實作RespHandler

将使用者發過來的封包解析成實際的指令,以便用于通信

建立檔案resp/handler/handler.go和interface/database/database.go

▌5.1 定義Database 接口

type CmdLine = [][]byte

type Database interface {
  Exec(client resp.Connection, args [][]byte) resp.Reply
  Close()
  AfterClientClose(c resp.Connection)
}

type DataEntity struct {
  Data interface{}
}           
  • CmdLine是一個别名類型,它實際上是一個由[]byte類型的切片組成的二維切片。這個類型可能被用來表示一行指令,其中每個位元組切片代表一個參數。
  • Database是一個接口類型,它定義了與資料庫相關的方法。具體來說,它有三個方法:Exec用于執行指令,Close用于關閉資料庫,AfterClientClose用于處理用戶端連接配接關閉事件。
  • Exec方法接受一個resp.Connection類型的參數client,表示與用戶端相關聯的連接配接,以及一個[][]byte類型的參數args,表示指令參數。它傳回一個resp.Reply類型的值,表示指令的執行結果。
  • Close方法用于關閉資料庫,沒有參數和傳回值。
  • AfterClientClose方法用于處理用戶端連接配接關閉事件,接受一個resp.Connection類型的參數表示已關閉的連接配接。
  • DataEntity是一個結構體類型,它具有一個名為Data的公共字段,類型為interface{}。這個類型可能被用來表示資料庫中的資料實體,其中Data字段可以包含任何類型的資料。

▌5.2 RespHandler結構體

◆5.2.1 RespHandler

定義結構體RespHandler。它有幾個字段:

  • activeConn類型為sync.Map:這個字段是一個并發安全的映射,用于存儲目前處于活動狀态的連接配接。
  • closing類型為atomic.Boolean:這個字段是一個原子布爾類型,用于表示伺服器是否正在關閉。
  • db類型為databaseface.Database:這個字段是一個實作了Database接口的對象,表示與此響應處理程式關聯的資料庫。

該結構體是一個響應處理程式,用于處理來自用戶端的請求。activeConn字段用于跟蹤目前處于活動狀态的連接配接,以便在關閉伺服器時關閉這些連接配接。closing字段可以用于在關閉伺服器時通知其他goroutine停止處理新的請求。db字段則表示與此響應處理程式關聯的資料庫。

其中RespHandler用于處理用戶端發送的Redis協定請求,并與一個實作了Database接口的對象進行互動來處理這些請求。

type RespHandler struct {
  activeConn sync.Map
  closing    atomic.Boolean
  db         databaseface.Database
}           

◆5.2.2 Close()方法

關閉協定層,即關閉整個redis

該方法的作用是關閉響應處理程式,并關閉所有目前處于活動狀态的連接配接。具體來說,它首先将closing字段設定為true,表示伺服器正在關閉。然後,它使用activeConn字段中存儲的連接配接清單來關閉所有目前處于活動狀态的連接配接。在關閉每個連接配接之前,它調用client.Close()方法來關閉連接配接。

最後,它調用db字段上的Close()方法來關閉與響應處理程式關聯的資料庫,并傳回nil表示沒有錯誤發生。

需要注意的是,在關閉連接配接時,activeConn字段是一個并發安全的映射,是以需要使用Range()方法來周遊所有目前處于活動狀态的連接配接

func (h *RespHandler) Close() error {
  logger.Info("handler shutting down...")
  h.closing.Set(true)
  // TODO: concurrent wait
  h.activeConn.Range(func(key interface{}, val interface{}) bool {
    client := key.(*connection.Connection)
    _ = client.Close()
    return true
  })
  h.db.Close()
return nil
}           

◆5.2.3 closeClient方法

該方法的作用是關閉一個用戶端連接配接,并從activeConn字段中删除它。具體來說,它首先調用client.Close()方法來關閉連接配接。然後,它調用db字段上的AfterClientClose()方法來處理用戶端連接配接關閉事件。最後,它使用activeConn字段的Delete()方法将連接配接從活動連接配接清單中删除。

是以,這個closeClient方法允許響應處理程式在需要關閉某個用戶端連接配接時,以正确的方式關閉該連接配接,并從活動連接配接清單中删除它。

func (h *RespHandler) closeClient(client *connection.Connection) {
  _ = client.Close()
  h.db.AfterClientClose(client)
  h.activeConn.Delete(client)
}           

◆5.2.4 實作Handle方法

Handle方法的作用是處理來自用戶端的請求,直到連接配接關閉為止。具體來說,它首先檢查伺服器是否正在關閉,如果是,則拒絕新的連接配接并關閉連接配接。然後,它建立一個新的Connection結構體,并将其與給定的網絡連接配接相關聯,并将連接配接添加到activeConn字段中以跟蹤它是活動的。

接下來,它使用parser.ParseStream()方法從連接配接中解析出一個個請求,并對每個請求進行處理。如果解析時發生錯誤,則它會将錯誤回複發送給用戶端,并關閉連接配接。如果請求沒有資料,則它會記錄一個錯誤并繼續處理下一個請求。

如果請求是一個正确格式的多批量回複,則它将調用db字段上的Exec()方法來執行該指令,并将結果寫回用戶端。如果執行結果為空,則它将寫入一個未知錯誤回複。

最後,在處理每個請求時,它都會檢查連接配接是否已經關閉。如果是,則它将關閉連接配接,并從activeConn字段中删除它。

是以,這個Handle方法允許響應處理程式處理來自用戶端的請求,并在用戶端關閉連接配接時正确地關閉連接配接。它還確定在伺服器關閉時,不會接受新的連接配接。

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  if h.closing.Get() {
    // closing handler refuse new connection
    _ = conn.Close()
  }

  client := connection.NewConn(conn)
  h.activeConn.Store(client, 1)

  ch := parser.ParseStream(conn)
  for payload := range ch {
    if payload.Err != nil {
      if payload.Err == io.EOF ||
        payload.Err == io.ErrUnexpectedEOF ||
        strings.Contains(payload.Err.Error(), "use of closed network connection") {
        // connection closed
        h.closeClient(client)
        logger.Info("connection closed: " + client.RemoteAddr().String())
        return
      }
      // protocol err
      errReply := reply.MakeErrReply(payload.Err.Error())
      err := client.Write(errReply.ToBytes())
      if err != nil {
        h.closeClient(client)
        logger.Info("connection closed: " + client.RemoteAddr().String())
        return
      }
      continue
    }
    if payload.Data == nil {
      logger.Error("empty payload")
      continue
    }
    r, ok := payload.Data.(*reply.MultiBulkReply)
    if !ok {
      logger.Error("require multi bulk reply")
      continue
    }
    result := h.db.Exec(client, r.Args)
    if result != nil {
      _ = client.Write(result.ToBytes())
    } else {
      _ = client.Write(unknownErrReplyBytes)
    }
  }
}           

▌6、測試

▌6.1 建立一個EchoDatabase,它實作了database.Database接口:

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)

}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
}
           

▌6.2 定義方法MakeHandler

調用NewEchoDatabase,這樣他就持有了一個最簡單的redis核心,他會把接收到的指令回發出去

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,
   }
}
           

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

從0開始,手寫Redis

▌實作記憶體資料庫:

本章主要介紹如何實作記憶體KV資料庫。

▌1、定義Dict接口

建立檔案datastruct/dict/dict.go,定義Dict

該接口定義,包含了常見的字典操作方法。下面對每個方法進行簡要說明:

  • Get(key string) (val interface{}, exists bool):根據鍵擷取值,傳回值和是否存在的布爾值。
  • Len():擷取字典中鍵值對的數量。
  • Put(key string, val interface{}) (result int):存儲一個鍵值對,并傳回字典中鍵值對的數量。
  • PutIfAbsent(key string, val interface{}) (result int):隻有當鍵不存在時才存儲一個鍵值對,并傳回字典中鍵值對的數量。
  • PutIfExists(key string, val interface{}) (result int):隻有當鍵存在時才存儲一個鍵值對,并傳回字典中鍵值對的數量。
  • Remove(key string) (result int):根據鍵删除一個鍵值對,并傳回字典中鍵值對的數量。
  • ForEach(consumer Consumer):周遊字典,并以每個鍵值對作為參數調用指定的方法。
  • Keys() []string:擷取字典中所有鍵的清單。
  • RandomKeys(limit int) []string:擷取一個随機的鍵清單,數量不超過指定的限制。
  • RandomDistinctKeysKeys(limit int) []string:擷取一個随機的不重複鍵清單,數量不超過指定的限制。
  • clear():清空字典中的所有鍵值對。
type Dict interface {
    Get(key string) (val interface{}, exsits bool)
    Len()
    Put(key string, val interface{}) (result int)
    PutIfAbsent(key string, val interface{}) (result int)
    PutIfExists(key string, val interface{}) (result int)
    Remove(key string) (result int)
    ForEach(consumer Consumer)
    Keys() []string
    RandomKeys(limit int) []string
    RandomDistinctKeysKeys(limit int) []string
    clear()
}           
type Consumer func(key string, val interface{}) bool           

▌2、為Dict接口寫一個實作

▌2.1 SyncDict結構體

該結構體定義了一個并發安全的字典,使用了Go語言标準庫中的sync.Map類型。

sync.Map是Go語言中的一個并發安全的字典類型,用于在多個goroutine之間共享資料。sync.Map的内部實作使用了一種特殊的算法,可以在無需使用鎖的情況下保證并發安全性。

SyncDict結構體中包含了一個sync.Map類型的成員變量m,用于存儲鍵值對。通過使用sync.Map類型,SyncDict類型可以提供并發安全的讀寫操作,避免在多個goroutine中出現資料競争的問題。

type SyncDict struct {
  m sync.Map
}           

下面為SyncDict實作Dict

▌2.2 Get方法

該方法接受一個參數key,表示要擷取的鍵。該方法傳回兩個值,一個是鍵對應的值val,另一個是表示鍵是否存在的布爾值exists。

在方法内部,使用sync.Map中的Load方法來擷取指定鍵的值。如果鍵存在,則傳回該鍵對應的值和true;如果鍵不存在,則傳回nil和false。

因為SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Load來擷取指定鍵的值。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) Get(key string) (val interface{}, exsits bool) {
  val, ok := dict.m.Load(key)
  return val, ok
}           

▌2.3 Put方法

Put方法用于向SyncDict中添加一個鍵值對。

該方法接受兩個參數,一個是key,表示要添加的鍵,另一個是val,表示要添加的值。該方法傳回一個int類型的值,表示添加操作的結果。如果添加的鍵之前已經存在,則傳回0;如果添加的鍵是新的,則傳回1。

在方法内部,首先使用sync.Map中的Load方法來判斷要添加的鍵是否已經存在。如果鍵已經存在,則将新的值覆寫舊的值;如果鍵是新的,則使用sync.Map中的Store方法來添加新的鍵值對。

最後,根據鍵是否已經存在來傳回相應的結果。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Load和Store來判斷鍵是否已經存在,以及添加鍵值對。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) Put(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  dict.m.Store(key, val)
  if existed {
    return 0
  }
  return 1
}           

▌2.4 Len方法

Len方法用于擷取SyncDict中鍵值對的數量。

該方法不接受任何參數,傳回一個int類型的值,表示SyncDict中鍵值對的數量。

在方法内部,首先定義一個變量length,用于記錄鍵值對的數量。然後使用sync.Map中的Range方法周遊所有的鍵值對,對每個鍵值對調用一個函數,該函數将length加1。最後傳回length的值,即為SyncDict中鍵值對的數量。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Range來周遊所有的鍵值對,并對每個鍵值對進行處理。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) Len() int {
  length := 0
  dict.m.Range(func(key, value interface{}) bool {
    length++
    return true
  })
  return length
}           

▌2.5 PutIfAbsent方法

PutIfAbsent用于向SyncDict中添加一個鍵值對,但僅當指定的鍵不存在時才添加。

該方法接受兩個參數,一個是key,表示要添加的鍵,另一個是val,表示要添加的值。該方法傳回一個int類型的值,表示添加操作的結果。如果添加的鍵之前已經存在,則傳回0;如果添加的鍵是新的,則傳回1。

在方法内部,首先使用sync.Map中的Load方法來判斷要添加的鍵是否已經存在。如果鍵已經存在,則直接傳回0;如果鍵是新的,則使用sync.Map中的Store方法來添加新的鍵值對。

最後,根據鍵是否已經存在來傳回相應的結果。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Load和Store來判斷鍵是否已經存在,以及添加鍵值對。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  if existed {
    return 0
  }
  dict.m.Store(key, val)
  return 1
}           

▌2.6 PutIfExists方法

PutIfExists用于向SyncDict中更新一個已存在的鍵值對,如果鍵不存在則不進行任何操作。

該方法接受兩個參數,一個是key,表示要更新的鍵,另一個是val,表示要更新的值。該方法傳回一個int類型的值,表示更新操作的結果。如果更新的鍵存在,則傳回1;如果更新的鍵不存在,則傳回0。

在方法内部,首先使用sync.Map中的Load方法來判斷要更新的鍵是否已經存在。如果鍵已經存在,則使用sync.Map中的Store方法來更新鍵值對,并傳回1;如果鍵不存在,則直接傳回0。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Load和Store來判斷鍵是否已經存在,以及更新鍵值對。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
  _, existed := dict.m.Load(key)
  if existed {
    dict.m.Store(key, val)
    return 1
  }
  return 0
}           

▌2.7 Remove方法

該方法用于從SyncDict中删除指定的鍵值對。

該方法接受一個參數key,表示要删除的鍵。該方法傳回一個int類型的值,表示删除操作的結果。如果要删除的鍵存在,則傳回1;如果要删除的鍵不存在,則傳回0。

在方法内部,首先使用sync.Map中的Load方法來判斷要删除的鍵是否已經存在。如果鍵已經存在,則使用sync.Map中的Delete方法來删除鍵值對,并傳回1;如果鍵不存在,則直接傳回0。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Load和Delete來判斷鍵是否已經存在,以及删除鍵值對。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) Remove(key string) (result int) {
  _, existed := dict.m.Load(key)
  dict.m.Delete(key)
  if existed {
    return 1
  }
  return 0
}           

▌2.8 ForEach方法

該方法用于對SyncDict中的所有鍵值對進行周遊,并對每個鍵值對進行操作。

該方法接受一個參數consumer,類型為Consumer。Consumer是一個函數類型,接受兩個參數,一個是string類型的鍵,另一個是interface{}類型的值,表示對鍵值對進行的操作。

在方法内部,使用sync.Map中的Range方法周遊所有的鍵值對,對每個鍵值對調用一個函數。該函數将鍵強制類型轉換為string類型,并将鍵和值作為參數傳遞給consumer函數進行處理。如果consumer函數傳回true,則繼續周遊下一個鍵值對;如果consumer函數傳回false,則停止周遊。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Range來周遊所有的鍵值對,并對每個鍵值對進行操作。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) ForEach(consumer Consumer) {
  dict.m.Range(func(key, value interface{}) bool {
    consumer(key.(string), value)
    return true
  })
}           

▌2.9 Keys方法

Keys方法用于擷取SyncDict中所有鍵的一個切片。

該方法不接受任何參數,傳回一個[]string類型的切片,其中包含了SyncDict中所有鍵的值。

在方法内部,首先建立一個[]string類型的切片result,切片長度為SyncDict中鍵值對的數量。然後使用sync.Map中的Range方法周遊所有的鍵值對,對每個鍵值對調用一個函數。該函數将鍵強制類型轉換為string類型,并将鍵添加到result切片中。最後傳回result切片即可。

由于SyncDict類型中的m成員變量是一個sync.Map類型的變量,是以可以直接使用sync.Map類型的方法Range來周遊所有的鍵值對,并擷取所有的鍵。由于sync.Map類型是并發安全的,是以在多個goroutine之間調用該方法時,也可以確定并發安全性。

func (dict *SyncDict) Keys() []string {
  result := make([]string, dict.Len())
  i := 0
  dict.m.Range(func(key, value interface{}) bool {
    result[i] = key.(string)
    i++
    return true
  })
  return result
}           

▌2.10 RandomKeys方法

RandomKeys方法用于擷取SyncDict中随機選取的若幹個鍵的一個切片。

該方法接受一個參數limit,表示要擷取的鍵的數量。方法傳回一個[]string類型的切片,其中包含了随機選取的limit個鍵的值。

在方法内部,首先建立一個[]string類型的切片result,切片長度為SyncDict中鍵值對的數量。然後使用一個for循環,每次循環随機選取一個鍵,并将該鍵添加到result切片中。

具體地,每次循環使用sync.Map中的Range方法周遊所有的鍵值對。在周遊過程中,将每個鍵添加到result切片中,并通過傳回false來停止周遊。由于Range方法的周遊順序是随機的,是以每次周遊得到的鍵也是随機的。循環limit次後,傳回result切片即可。

需要注意的是,如果SyncDict中的鍵值對數量小于limit,則傳回的切片長度将小于limit。如果需要擷取的鍵的數量大于SyncDict中鍵值對的數量,則會重複選取已有的鍵。

func (dict *SyncDict) RandomKeys(limit int) []string {
  result := make([]string, dict.Len())
  for i := 0; i < limit; i++ {
    dict.m.Range(func(key, value interface{}) bool {
      result[i] = key.(string)
      return false
    })
  }
  return result
}           

▌2.11 RandomDistinctKeysKeys方法

RandomDistinctKeysKeys方法用于擷取SyncDict中随機選取的若幹個不重複鍵的一個切片。

該方法接受一個參數limit,表示要擷取的鍵的數量。方法傳回一個[]string類型的切片,其中包含了随機選取的limit個不重複鍵的值。

在方法内部,首先建立一個[]string類型的切片result,切片長度為SyncDict中鍵值對的數量。然後使用sync.Map中的Range方法周遊所有的鍵值對,将每個鍵添加到result切片中。同時,在周遊過程中,判斷result切片中已經包含的鍵的數量是否達到了limit。如果已經達到了limit,則通過傳回false來停止周遊;否則,繼續周遊。

接下來,對result切片進行洗牌操作,打亂其中的元素順序。然後傳回result切片的前limit個元素,即為随機選取的limit個不重複鍵的值。

需要注意的是,如果SyncDict中的鍵值對數量小于limit,則傳回的切片長度将小于limit。如果需要擷取的鍵的數量大于SyncDict中鍵值對的數量,則會重複選取已有的鍵。

func (dict *SyncDict) RandomDistinctKeysKeys(limit int) []string {
  result := make([]string, dict.Len())
  i := 0
  dict.m.Range(func(key, value interface{}) bool {
    result[i] = key.(string)
    i++
    if i == limit {
      return false
    }
    return true
  })
  return result
}           

▌3、DB結構體

sync.Map是redis最底層存儲結構,我們儲存資料的結果到這一層,那它的上一層是什麼?是DB,用過redis的同學都知道,預設redis有16個DB,你可以認為是16個分資料庫。 是以需要在database這個檔案夾裡面建立一個叫做DB的資料結構。

▌3.1 DB結構體

該結構體表示一個簡單的鍵值資料庫。

該結構體有兩個成員變量。第一個成員變量是index,表示目前資料庫中最後一個鍵的索引值。第二個成員變量是data,是一個dict.Dict類型的變量,表示存儲所有鍵值對的字典。dict.Dict是一個自定義的字典類型,用于實作線程安全的鍵值對存儲。在該結構體中,鍵是一個string類型的值,值是一個DataEntity類型的值,DataEntity是一個自定義的資料實體類型,用于存儲鍵值對的值。

該結構體可以用于實作一個簡單的鍵值資料庫,可以存儲任意類型的資料。可以通過向data成員變量中添加鍵值對,實作資料的存儲。可以通過查詢鍵值對的鍵擷取對應的值,實作資料的讀取。同時,由于使用了dict.Dict類型的變量實作存儲,是以可以保證在多個goroutine之間并發通路時的線程安全性。

type DB struct {
  index int
  data dict.Dict
}           

▌3.2 ExecFunc接口

定義ExecFunc接口,表示一個執行函數。

該函數類型接受兩個參數。第一個參數是一個指向DB類型的指針,表示一個鍵值資料庫。第二個參數是一個[][]byte類型的切片,表示該函數的參數清單。函數需要根據參數清單和鍵值資料庫進行一些操作,并傳回一個resp.Reply類型的值,表示操作的結果。

該函數類型可以用于定義鍵值資料庫中的操作函數,例如擷取鍵值、設定鍵值、删除鍵值等。在使用時,可以将函數定義為ExecFunc類型的函數,并将其作為參數傳遞給一個執行函數,該執行函數可以根據傳入的參數調用相應的操作函數,并傳回操作結果。

需要注意的是,由于ExecFunc類型的函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行修改操作。是以,在使用時需要注意線程安全性,以避免多個goroutine并發修改同一個鍵值資料庫時出現的問題。

type ExecFunc func(db *DB, args [][]byte) resp.Reply           

▌3.3 Exec方法

該方法用于執行Redis協定中的指令,并傳回指令執行的結果。

該方法接受兩個參數,第一個參數是一個resp.Connection類型的值,表示一個Redis用戶端連接配接。第二個參數是一個[][]byte類型的切片,表示Redis協定中的指令行參數清單。

在方法内部,首先根據指令行參數清單的第一個元素擷取對應的指令處理函數。如果找不到對應的指令處理函數,則傳回一個錯誤回複,表示未知指令。如果找到了對應的指令處理函數,則根據指令的參數個數檢查指令行參數清單的長度是否符合要求。如果參數個數不符合要求,則傳回一個錯誤回複,表示參數數量錯誤。

接着,調用指令處理函數,将DB類型的指針和除指令名以外的參數清單作為參數傳遞給該函數,執行指令操作,并傳回執行結果。

該方法可用于實作Redis協定的指令執行,可以根據指令名和參數清單調用相應的指令處理函數,并傳回執行結果。需要注意的是,由于該方法可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {

 cmdName := strings.ToLower(string(cmdLine[0]))
 cmd, ok := cmdTable[cmdName]
 if !ok {
  return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
 }
 if !validateArity(cmd.arity, cmdLine) {
  return reply.MakeArgNumErrReply(cmdName)
 }
 fun := cmd.executor
 return fun(db, cmdLine[1:])
}           

▌3.4 GetEntity方法

該方法用于擷取鍵值資料庫中指定鍵的資料實體。

該方法接受一個參數key,表示要擷取的資料實體的鍵。方法傳回兩個值,第一個值是一個指向DataEntity類型的指針,表示擷取到的資料實體。如果找不到指定鍵的資料實體,則該指針為nil。第二個值是一個bool類型的值,表示是否成功擷取到資料實體。如果成功擷取到資料實體,則該值為true;否則,該值為false。

在方法内部,首先使用db.data.Get()方法從鍵值資料庫中擷取指定鍵的值。如果找不到指定鍵的值,則傳回nil和false。如果找到了指定鍵的值,則将其轉換為DataEntity類型的指針,然後将該指針和true作為傳回值傳回。

該方法可用于從鍵值資料庫中擷取指定鍵的資料實體,可以根據需要在程式中調用該方法進行查詢操作。需要注意的是,由于該方法可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {

   raw, ok := db.data.Get(key)
   if !ok {
      return nil, false
   }
   entity, _ := raw.(*database.DataEntity)
   return entity, true
}           

▌3.5 PutEntity方法

該方法用于向鍵值資料庫中添加一個鍵值對,其中鍵為key,值為entity。

該方法接受兩個參數,第一個參數是一個string類型的值,表示要添加的鍵。第二個參數是一個指向DataEntity類型的指針,表示要添加的值。該方法傳回一個int類型的值,表示添加操作的結果。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

在方法内部,使用db.data.Put()方法向鍵值資料庫中添加指定的鍵值對。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

該方法可用于向鍵值資料庫中添加指定的鍵值對,可以根據需要在程式中調用該方法進行添加操作。需要注意的是,由于該方法可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
   return db.data.Put(key, entity)
}           

▌3.6 PutIfExists方法

該方法用于向鍵值資料庫中添加一個鍵值對,隻有在指定鍵已經存在時才執行添加操作。

該方法接受兩個參數,第一個參數是一個string類型的值,表示要添加的鍵。第二個參數是一個指向DataEntity類型的指針,表示要添加的值。該方法傳回一個int類型的值,表示添加操作的結果。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

在方法内部,使用db.data.PutIfExists()方法向鍵值資料庫中添加指定的鍵值對。該方法隻有在指定鍵已經存在時才執行添加操作。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

該方法可用于向鍵值資料庫中添加指定的鍵值對,隻有在指定鍵已經存在時才執行添加操作,可以根據需要在程式中調用該方法進行添加操作。需要注意的是,由于該方法可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
   return db.data.PutIfExists(key, entity)
}           

▌3.6 PutIfAbsent方法

該方法用于向鍵值資料庫中添加一個鍵值對,隻有在指定鍵不存在時才執行添加操作。

該方法接受兩個參數,第一個參數是一個string類型的值,表示要添加的鍵。第二個參數是一個指向DataEntity類型的指針,表示要添加的值。該方法傳回一個int類型的值,表示添加操作的結果。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

在方法内部,使用db.data.PutIfAbsent()方法向鍵值資料庫中添加指定的鍵值對。該方法隻有在指定鍵不存在時才執行添加操作。如果添加成功,則傳回一個大于等于0的整數,表示添加的鍵值對數量;如果添加失敗,則傳回一個小于0的整數,表示錯誤代碼。

該方法可用于向鍵值資料庫中添加指定的鍵值對,隻有在指定鍵不存在時才執行添加操作,可以根據需要在程式中調用該方法進行添加操作。需要注意的是,由于該方法可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
   return db.data.PutIfAbsent(key, entity)
}           

▌4、command結構體

每一個指令都是一個command結構體,每一個command結構體裡面有一個執行方法,我們去實作這個執行非法,施加到DB上。

該結構體用于表示一個Redis指令的定義,包括指令處理函數和指令的參數個數限制。

該結構體有兩個字段,分别為executor和arity。executor是一個ExecFunc類型的值,表示該指令的處理函數。arity是一個int類型的值,表示該指令所允許的參數個數限制。如果該值為正數,則表示該指令隻允許具有該數量的參數;如果該值為負數,則表示該指令允許的最少參數個數為-arity。

該結構體可用于表示Redis指令的定義,可以在程式中定義一組指令并儲存在一個指令清單中,用于執行Redis協定中的指令操作。需要注意的是,由于該結構體隻是一個指令的定義,它本身并不包含指令的具體實作,是以需要在程式中定義對應的指令處理函數,并将其與該指令的定義關聯起來。

type command struct {
   executor ExecFunc
   arity    int
}           

定義一個變量:

var cmdTable = make(map[string]*command)           

該變量是一個map類型的值,用于儲存Redis指令的定義。

該變量的鍵是一個string類型的值,表示Redis指令的名稱。該變量的值是一個指向command類型的指針,表示對應指令的定義。

該變量可用于在程式中儲存Redis指令的定義。可以通過定義一組command類型的值,并将其儲存在cmdTable中,來定義一組可供Redis協定執行的指令操作。在程式中執行Redis指令時,可以根據指令名稱從cmdTable中查找對應的指令定義,并執行該指令的處理函數。

需要注意的是,由于cmdTable是一個全局變量,是以可以在程式的任何部分通路它。在多線程環境下,需要注意對cmdTable的讀寫操作的線程安全性,以避免多個goroutine并發通路cmdTable時出現的問題。

實作一個注冊指令的方法:

func RegisterCommand(name string, executor ExecFunc, arity int) {
  name = strings.ToLower(name)
  cmdTable[name] = &command{
    executor: executor,
      arity:    arity,
  }
}           

該函數用于向Redis指令清單中添加一個新的指令定義。

該函數接受三個參數,分别為name、executor和arity。name是一個string類型的值,表示要添加的指令的名稱。executor是一個ExecFunc類型的值,表示要添加的指令的處理函數。arity是一個int類型的值,表示要添加的指令所允許的參數個數限制。

在函數内部,首先将name轉換為小寫字母形式,并将其作為鍵,建立一個新的command類型的值,将executor和arity設定為對應的值,并将該值儲存在cmdTable中,以完成對新指令的定義。

該函數可用于向Redis指令清單中添加一個新的指令定義,可以在程式中調用該函數來添加自定義的Redis指令。需要注意的是,由于cmdTable是一個全局變量,是以可以在程式的任何部分通路它。在多線程環境下,需要注意對cmdTable的讀寫操作的線程安全性,以避免多個goroutine并發通路cmdTable時出現的問題。

接下來我們就來實作我們需要的指令:

▌5、實作ping指令

建立一個database/ping.go檔案用于實作ping指令,接收到一個ping,傳回pong

Ping函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示PING指令所包含的參數清單。

在函數内部,首先判斷args的長度,如果args的長度為0,則傳回一個PongReply類型的值,表示對PING指令的響應為PONG。如果args的長度為1,則傳回一個StatusReply類型的值,該值的内容為args[0]所表示的字元串。如果args的長度不為0也不為1,則傳回一個ErrReply類型的值,該值的内容為"ERR wrong number of arguments for 'ping' command",表示參數個數錯誤。

該函數可用于處理Redis協定中的PING指令,在程式中調用該函數來執行PING指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func Ping(db *DB, args [][]byte) resp.Reply {
  if len(args) == 0 {
    return &reply.PongReply{}
  } else if len(args) == 1 {
    return reply.MakeStatusReply(string(args[0]))
  } else {
    return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
  }
}           

▌6、實作KEY指令

▌6.1 execDel方法

該函數用于處理Redis協定中的DEL指令,即删除一個或多個指定的鍵。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示DEL指令所包含的參數清單,即要删除的鍵的清單。

在函數内部,首先建立一個string類型的切片keys,将args中的所有[]byte類型的值轉換為對應的字元串,并儲存到keys中。然後調用db.Removes()方法,将keys作為參數傳遞給該方法,以執行删除操作。最後,将删除的鍵的數量作為int64類型的值,使用MakeIntReply()方法建立一個IntReply類型的值,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的DEL指令,在程式中調用該函數來執行DEL指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execDel(db *DB, args [][]byte) resp.Reply {
  keys := make([]string, len(args))
  for i, v := range args {
    keys[i] = string(v)
  }

  deleted := db.Removes(keys...)
  return reply.MakeIntReply(int64(deleted))
}           

▌6.2 execExists方法

該函數用于處理Redis協定中的EXISTS指令,即檢查一個或多個指定的鍵是否存在于鍵值資料庫中。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示EXISTS指令所包含的參數清單,即要檢查的鍵的清單。

在函數内部,首先建立一個int64類型的變量result,用于儲存存在于鍵值資料庫中的鍵的數量,初始值為0。然後周遊args中的所有[][]byte類型的值,将其轉換為對應的字元串,并使用db.GetEntity()方法檢查該鍵是否存在于鍵值資料庫中。如果存在,則将result的值加1。最後,使用MakeIntReply()方法建立一個IntReply類型的值,将result的值作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的EXISTS指令,在程式中調用該函數來執行EXISTS指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execExists(db *DB, args [][]byte) resp.Reply {
  result := int64(0)
  for _, arg := range args {
    key := string(arg)
    _, exists := db.GetEntity(key)
    if exists {
      result++
    }
  }
  return reply.MakeIntReply(result)
}           

▌6.3 execFlushDB方法

該函數用于處理Redis協定中的FLUSHDB指令,即清空目前資料庫中的所有鍵值對。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示FLUSHDB指令所包含的參數清單,但在實際應用中該參數清單為空。

在函數内部,調用db.Flush()方法,該方法會清空目前資料庫中的所有鍵值對。然後使用OkReply()方法建立一個OkReply類型的值,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的FLUSHDB指令,在程式中調用該函數來執行FLUSHDB指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execFlushDB(db *DB, args [][]byte) resp.Reply {
  db.Flush()
  return &reply.OkReply{}
}           

▌6.4 execType方法

該函數用于處理Redis協定中的TYPE指令,即擷取指定鍵的資料類型。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示TYPE指令所包含的參數清單,即要擷取資料類型的鍵的名稱。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并使用db.GetEntity()方法擷取該鍵對應的Entity類型的值。如果該鍵不存在于鍵值資料庫中,則使用MakeStatusReply()方法建立一個StatusReply類型的值,将"none"作為參數傳遞給該方法,并将其作為函數的傳回值傳回。如果該鍵存在于鍵值資料庫中,則使用switch語句判斷該Entity類型的值的Data字段的類型。如果Data字段的類型為[]byte,則使用MakeStatusReply()方法建立一個StatusReply類型的值,将"string"作為參數傳遞給該方法,并将其作為函數的傳回值傳回。如果Data字段的類型不為[]byte,則使用UnknownErrReply()方法建立一個UnknownErrReply類型的值,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的TYPE指令,在程式中調用該函數來執行TYPE指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execType(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  entity, exists := db.GetEntity(key)
  if !exists {
    return reply.MakeStatusReply("none")
  }
  switch entity.Data.(type) {
    case []byte:
    return reply.MakeStatusReply("string")
  }
      return &reply.UnknownErrReply{}
  }           

▌6.5 execRename方法

該函數用于處理Redis協定中的RENAME指令,即将一個鍵名改為另一個鍵名。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示RENAME指令所包含的參數清單,即要被重命名的鍵的名稱和新名稱。

在函數内部,首先判斷args的長度是否為2,如果不是,則使用MakeErrReply()方法建立一個ErrReply類型的值,将"ERR wrong number of arguments for 'rename' command"作為參數傳遞給該方法,并将其作為函數的傳回值傳回。如果args的長度為2,則将args中的第一個[][]byte類型的值和第二個[][]byte類型的值分别轉換為對應的字元串,分别儲存到src和dest變量中。接着使用db.GetEntity()方法擷取鍵名為src的Entity類型的值,如果該鍵不存在于鍵值資料庫中,則使用MakeErrReply()方法建立一個ErrReply類型的值,将"no such key"作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

如果該鍵存在于鍵值資料庫中,則使用db.PutEntity()方法将鍵名為dest的Entity類型的值設定為該Entity類型的值,并使用db.Remove()方法删除鍵名為src的鍵值對。最後,使用OkReply()方法建立一個OkReply類型的值,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的RENAME指令,在程式中調用該函數來執行RENAME指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execRename(db *DB, args [][]byte) resp.Reply {
  if len(args) != 2 {
    return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
  }
  src := string(args[0])
  dest := string(args[1])

  entity, ok := db.GetEntity(src)
  if !ok {
    return reply.MakeErrReply("no such key")
  }
  db.PutEntity(dest, entity)
  db.Remove(src)
  return &reply.OkReply{}
}           

▌6.6 execRenameNx方法

該函數用于處理Redis協定中的RENAMENX指令,即将一個鍵名改為另一個鍵名,當且僅當新鍵名不存在時才執行操作。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示RENAMENX指令所包含的參數清單,即要被重命名的鍵的名稱和新名稱。

在函數内部,首先将args中的第一個[][]byte類型的值和第二個[][]byte類型的值分别轉換為對應的字元串,分别儲存到src和dest變量中。然後使用db.GetEntity()方法檢查鍵名為dest的鍵值對是否存在于鍵值資料庫中,如果存在,則使用MakeIntReply()方法建立一個IntReply類型的值,将0作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

如果不存在,則使用db.GetEntity()方法擷取鍵名為src的Entity類型的值,如果該鍵不存在于鍵值資料庫中,則使用MakeErrReply()方法建立一個ErrReply類型的值,将"no such key"作為參數傳遞給該方法,并将其作為函數的傳回值傳回。如果該鍵存在于鍵值資料庫中,則使用db.Removes()方法清空鍵名為src和dest的鍵值對的過期時間,然後使用db.PutEntity()方法将鍵名為dest的Entity類型的值設定為該Entity類型的值。最後,使用MakeIntReply()方法建立一個IntReply類型的值,将1作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的RENAMENX指令,在程式中調用該函數來執行RENAMENX指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execRenameNx(db *DB, args [][]byte) resp.Reply {
  src := string(args[0])
  dest := string(args[1])

  _, ok := db.GetEntity(dest)
  if ok {
    return reply.MakeIntReply(0)
  }

  entity, ok := db.GetEntity(src)
  if !ok {
    return reply.MakeErrReply("no such key")
  }
  db.Removes(src, dest) // clean src and dest with their ttl
  db.PutEntity(dest, entity)
  return reply.MakeIntReply(1)
}           

▌6.7 execKeys方法

該函數用于處理Redis協定中的KEYS指令,即根據給定的通配符模式,傳回所有與模式比對的鍵名。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示KEYS指令所包含的參數清單,即通配符模式。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并使用wildcard.CompilePattern()方法将其編譯為通配符模式。然後建立一個空的[][]byte類型的值result,用于儲存所有比對的鍵名。接着使用db.data.ForEach()方法周遊整個鍵值資料庫,對于每個鍵值對,判斷其鍵名是否與模式比對。如果比對,則将該鍵名轉換為[]byte類型,并将其添加到result中。最後,使用MakeMultiBulkReply()方法建立一個MultiBulkReply類型的值,以result作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的KEYS指令,在程式中調用該函數來執行KEYS指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execKeys(db *DB, args [][]byte) resp.Reply {
  pattern := wildcard.CompilePattern(string(args[0]))
  result := make([][]byte, 0)
  db.data.ForEach(func(key string, val interface{}) bool {
    if pattern.IsMatch(key) {
      result = append(result, []byte(key))
    }
    return true
  })
  return reply.MakeMultiBulkReply(result)
}           

▌6.8 初始化方法

該函數用于初始化指令處理器,注冊并綁定不同的指令處理函數。

在函數内部,首先使用RegisterCommand()方法注冊PING指令的處理函數Ping,并将該指令的參數個數設定為-1,表示該指令可以接受任意數量的參數。

該函數可用于初始化指令處理器,在程式啟動時調用該函數注冊不同的指令處理函數,以便後續處理用戶端請求。需要注意的是,不同的指令處理函數可能需要不同數量的參數,具體參數數量需要根據指令的語義和實作進行設定。

func init() {
  RegisterCommand("ping", Ping, -1)
}           

▌7、實作string指令

▌7.1 getAsString方法

該方法用于擷取一個鍵對應的字元串類型的值。

該方法接受一個參數,即要擷取值的鍵名key。

在方法内部,首先使用db.GetEntity()方法擷取鍵名為key的Entity類型的值,如果該鍵不存在于鍵值資料庫中,則傳回nil和nil。如果該鍵存在于鍵值資料庫中,則判斷該Entity類型的值的Data字段的類型是否為[]byte類型。如果是,則将該字段轉換為[]byte類型的值,并将其作為函數的第一個傳回值傳回。如果不是,則使用WrongTypeErrReply()方法建立一個WrongTypeErrReply類型的值,并将其作為函數的第二個傳回值傳回。

該方法可用于擷取鍵值資料庫中指定鍵名的字元串類型的值。需要注意的是,在使用該方法前需要确定該鍵名對應的值确實為字元串類型,否則将導緻類型不比對的錯誤。

func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
  entity, ok := db.GetEntity(key)
  if !ok {
    return nil, nil
  }
  bytes, ok := entity.Data.([]byte)
  if !ok {
    return nil, &reply.WrongTypeErrReply{}
  }
  return bytes, nil
}           

▌7.2 execGet方法

該函數用于處理Redis協定中的GET指令,即擷取指定鍵名的值。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示GET指令所包含的參數清單,即要擷取值的鍵名。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并使用db.getAsString()方法擷取該鍵名對應的字元串類型的值。如果該鍵不存在于鍵值資料庫中,則傳回一個ErrorReply類型的值err。如果該鍵存在于鍵值資料庫中,則将其轉換為[]byte類型的值,并儲存到bytes變量中。如果bytes為nil,則使用NullBulkReply()方法建立一個NullBulkReply類型的值,并将其作為函數的傳回值傳回。否則,使用MakeBulkReply()方法建立一個BulkReply類型的值,以bytes作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的GET指令,在程式中調用該函數來執行GET指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execGet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  bytes, err := db.getAsString(key)
  if err != nil {
    return err
  }
  if bytes == nil {
    return &reply.NullBulkReply{}
  }
  return reply.MakeBulkReply(bytes)
}           

▌7.3 execSet方法

該函數用于處理Redis協定中的SET指令,即設定鍵名對應的值。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示SET指令所包含的參數清單,即要設定值的鍵名和值。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并将args中的第二個[][]byte類型的值儲存到value變量中。然後建立一個DataEntity類型的值entity,将value作為其Data字段的值,并将該值設定為鍵名為key的鍵值對的值,使用db.PutEntity()方法将其儲存到鍵值資料庫中。最後,使用OkReply()方法建立一個OkReply類型的值,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的SET指令,在程式中調用該函數來執行SET指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
    Data: value,
  }
  db.PutEntity(key, entity)
  return &reply.OkReply{}
}           

▌7.4 execSetNX方法

該函數用于處理Redis協定中的SETNX指令,即設定鍵名對應的值,當且僅當該鍵名不存在時才執行操作。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示SETNX指令所包含的參數清單,即要設定值的鍵名和值。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并将args中的第二個[][]byte類型的值儲存到value變量中。然後建立一個DataEntity類型的值entity,将value作為其Data字段的值,并使用db.PutIfAbsent()方法将該值設定為鍵名為key的鍵值對的值。該方法會傳回一個bool類型的值result,表示是否執行了設定操作。最後,使用MakeIntReply()方法建立一個IntReply類型的值,以result轉換為int64類型的值作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的SETNX指令,在程式中調用該函數來執行SETNX指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execSetNX(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
    Data: value,
  }
  result := db.PutIfAbsent(key, entity)
  return reply.MakeIntReply(int64(result))
}           

▌7.5 execGetSet方法

該函數用于處理Redis協定中的GETSET指令,即設定鍵名對應的值,并傳回該鍵名原來的值。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示GETSET指令所包含的參數清單,即要設定值的鍵名和值。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并将args中的第二個[][]byte類型的值儲存到value變量中。然後使用db.GetEntity()方法擷取鍵名為key的Entity類型的值,并将exists變量設定為該鍵名是否存在于鍵值資料庫中的bool類型的值。接着使用db.PutEntity()方法将一個DataEntity類型的值作為鍵名為key的鍵值對的值。最後,如果exists為false,則使用MakeNullBulkReply()方法建立一個NullBulkReply類型的值,并将其作為函數的傳回值傳回。否則,将舊的值(即entity.Data)轉換為[]byte類型的值,并使用MakeBulkReply()方法建立一個BulkReply類型的值,以該值作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的GETSET指令,在程式中調用該函數來執行GETSET指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execGetSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]

  entity, exists := db.GetEntity(key)
  db.PutEntity(key, &database.DataEntity{Data: value})
  if !exists {
    return reply.MakeNullBulkReply()
  }
old := entity.Data.([]byte)
return reply.MakeBulkReply(old)
}           

▌7.6 execStrLen方法

該函數用于處理Redis協定中的STRLEN指令,即擷取鍵名對應的字元串類型的值的長度。

該函數接受兩個參數,分别為db和args。db是一個指向DB類型的指針,表示要操作的鍵值資料庫。args是一個[][]byte類型的值,表示STRLEN指令所包含的參數清單,即要擷取長度的鍵名。

在函數内部,首先将args中的第一個[][]byte類型的值轉換為對應的字元串,并使用db.GetEntity()方法擷取鍵名為key的Entity類型的值,并将exists變量設定為該鍵名是否存在于鍵值資料庫中的bool類型的值。接着,如果exists為false,則使用MakeNullBulkReply()方法建立一個NullBulkReply類型的值,并将其作為函數的傳回值傳回。否則,将擷取到的值(即entity.Data)轉換為[]byte類型的值,并使用len()函數擷取該值的長度,然後使用MakeIntReply()方法建立一個IntReply類型的值,以該長度轉換為int64類型的值作為參數傳遞給該方法,并将其作為函數的傳回值傳回。

該函數可用于處理Redis協定中的STRLEN指令,在程式中調用該函數來執行STRLEN指令,并傳回對應的響應。需要注意的是,由于該函數可以通路一個DB類型的變量,是以可以對鍵值資料庫進行讀操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個鍵值資料庫時出現的問題。

func execStrLen(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  entity, exists := db.GetEntity(key)
  if !exists {
    return reply.MakeNullBulkReply()
  }
  old := entity.Data.([]byte)
  return reply.MakeIntReply(int64(len(old)))
}           

▌7.7 init方法

該函數用于初始化指令處理器,注冊并綁定不同的指令處理函數。

在函數内部,首先使用RegisterCommand()方法注冊GET指令的處理函數execGet,并将該指令的參數個數設定為2。然後使用RegisterCommand()方法注冊SET指令的處理函數execSet,并将該指令的參數個數設定為-3,表示該指令可以接受任意數量的參數。接着使用RegisterCommand()方法注冊SETNX指令的處理函數execSetNX,并将該指令的參數個數設定為3。然後使用RegisterCommand()方法注冊GETSET指令的處理函數execGetSet,并将該指令的參數個數設定為3。最後使用RegisterCommand()方法注冊STRLEN指令的處理函數execStrLen,并将該指令的參數個數設定為2。

該函數可用于初始化指令處理器,在程式啟動時調用該函數注冊不同的指令處理函數,以便後續處理用戶端請求。需要注意的是,不同的指令處理函數可能需要不同數量的參數,具體參數數量需要根據指令的語義和實作進行設定。

func init() {
  RegisterCommand("Get", execGet, 2)
  RegisterCommand("Set", execSet, -3)
  RegisterCommand("SetNx", execSetNX, 3)
  RegisterCommand("GetSet", execGetSet, 3)
  RegisterCommand("StrLen", execStrLen, 2)
}           

▌8、Database

我們在上一章為了測試,寫了一個echoDataBase,現在我們來實作一個真實的Database

type Database struct {
  dbSet []*DB
}           

該結構體包含一個dbSet成員變量,類型為[]*DB,表示一個DB類型的指針數組。該結構體用于表示Redis伺服器中的資料庫集合,每個元素表示一個資料庫。

在Redis伺服器中,可以通過SELECT指令來切換目前使用的資料庫。每個資料庫都是一個鍵值對資料庫,可以存儲任意類型的值。在該結構體中,dbSet數組中的每個元素都是一個DB類型的指針,表示一個鍵值對資料庫。可以通過周遊dbSet數組來通路不同的資料庫,并對其進行讀寫操作。

▌8.1 NewDatabase

該函數用于建立一個新的Database類型的值,并傳回其指針。

在函數内部,首先建立一個空的Database類型的值,并将其指派給mdb變量。然後判斷配置參數中指定的資料庫數量是否為0,如果為0,則将其設定為預設值16。接着,使用make()函數建立一個長度為config.Properties.Databases的[]*DB類型的值,并将其指派給mdb.dbSet變量。接下來,周遊mdb.dbSet數組中的每個元素,使用makeDB()函數建立一個新的DB類型的值,并将其指派給singleDB變量。然後将周遊到的元素的索引值指派給singleDB的index成員變量,并将singleDB指派給mdb.dbSet數組中的相應元素。最後,将mdb的指針作為函數的傳回值傳回。

該函數可用于建立一個新的Database類型的值,并初始化其中的每個DB類型的值。在程式啟動時調用該函數來建立Redis伺服器使用的資料庫集合。需要注意的是,根據配置參數的不同,該函數可能會建立不同數量的資料庫。

func NewDatabase() *Database {
  mdb := &Database{}
  if config.Properties.Databases == 0 {
    config.Properties.Databases = 16
  }
  mdb.dbSet = make([]*DB, config.Properties.Databases)
  for i := range mdb.dbSet {
    singleDB := makeDB()
    singleDB.index = i
    mdb.dbSet[i] = singleDB
  }
  return mdb
}           

▌8.2 Exec方法

該方法用于執行Redis指令,根據指令名稱調用相應的指令處理函數。

該方法接受兩個參數,分别為c和cmdLine。其中,c表示一個resp.Connection類型的值,表示一個用戶端連接配接。cmdLine是一個[][]byte類型的值,表示Redis指令的參數清單。

在函數内部,首先使用defer和recover函數實作了異常捕獲和處理。然後将cmdLine中的第一個參數轉換為小寫字母,并将其指派給cmdName變量。接着判斷cmdName是否等于"select",如果是,則調用execSelect()函數來處理SELECT指令,并将其傳回值作為函數的傳回值傳回。如果cmdName不等于"select",則擷取目前用戶端連接配接所選擇的資料庫的索引值,并從mdb.dbSet數組中選擇相應的DB類型的值。然後調用該DB類型的值的Exec方法來處理指令,并将其傳回值作為函數的傳回值傳回。

該方法可用于執行Redis指令,并根據指令名稱調用相應的指令處理函數。在程式中調用該方法來處理用戶端請求。需要注意的是,由于該方法可以通路一個Database類型的變量,是以可以對其中的每個DB類型的變量進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個Database類型的變量時出現的問題。

func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
  defer func() {
    if err := recover(); err != nil {
      logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
    }
  }()

  cmdName := strings.ToLower(string(cmdLine[0]))
  if cmdName == "select" {
    if len(cmdLine) != 2 {
      return reply.MakeArgNumErrReply("select")
    }
    return execSelect(c, mdb, cmdLine[1:])
  }
  // normal commands
  dbIndex := c.GetDBIndex()
  selectedDB := mdb.dbSet[dbIndex]
  return selectedDB.Exec(c, cmdLine)
}           

▌8.3 execSelect方法

該函數用于處理Redis中的SELECT指令,切換目前使用的資料庫。

該函數接受三個參數,分别為c、mdb和args。其中,c表示一個resp.Connection類型的值,表示一個用戶端連接配接。mdb表示一個Database類型的指針,表示Redis伺服器的資料庫集合。args是一個[][]byte類型的值,表示Redis指令的參數清單。

在函數内部,首先使用strconv.Atoi()函數将args中的第一個參數轉換為整數,并将其指派給dbIndex變量。如果轉換失敗,則傳回一個錯誤響應。接着,判斷dbIndex是否大于等于mdb.dbSet數組的長度,如果是,則傳回一個超出範圍的錯誤響應。否則,調用c的SelectDB方法,将dbIndex作為參數傳入,以切換目前使用的資料庫。最後,傳回一個OK響應。

該函數可用于處理Redis中的SELECT指令,切換目前使用的資料庫。在程式中調用該函數來處理用戶端請求。需要注意的是,由于該函數可以通路一個Database類型的變量,是以可以對其中的每個DB類型的變量進行讀寫操作。在使用時需要注意線程安全性,以避免多個goroutine并發通路同一個Database類型的變量時出現的問題。

func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
  dbIndex, err := strconv.Atoi(string(args[0]))
  if err != nil {
    return reply.MakeErrReply("ERR invalid DB index")
  }
  if dbIndex >= len(mdb.dbSet) {
    return reply.MakeErrReply("ERR DB index is out of range")
  }
  c.SelectDB(dbIndex)
  return reply.MakeOkReply()
}           

▌GO實作Redis持久化:

本章将介紹如何使用 golang 實作 Append Only File 持久化

▌1、AofHandler

建立檔案aof/aof.go

▌1.1 AofHandler結構體

定義AofHandler結構體。該結構體用于處理Redis的AOF(Append Only File)持久化功能,将Redis指令記錄到磁盤檔案中以保證資料的持久化。

該結構體包含了以下成員變量:

  • db:表示一個databaseface.Database類型的值,表示Redis伺服器的資料庫集合。
  • aofChan:表示一個chan *payload類型的通道,用于接收需要記錄到AOF檔案的Redis指令。
  • aofFile:表示一個*os.File類型的指針,表示目前正在記錄的AOF檔案。
  • aofFilename:表示一個字元串類型的值,表示目前正在使用的AOF檔案的檔案名。
  • currentDB:表示一個整數類型的值,表示目前正在使用的資料庫的索引值。

AofHandler結構體可用于實作Redis的AOF持久化功能,将Redis指令記錄到磁盤檔案中以保證資料的持久化。在程式中,可以通過建立一個AofHandler類型的值并調用其相應的方法來實作AOF持久化功能。

type AofHandler struct {
  db          databaseface.Database
  aofChan     chan *payload
  aofFile     *os.File
  aofFilename string
  currentDB   int
}           

▌1.2 實作AddAof

該方法用于将Redis指令添加到AOF持久化日志中。

該方法接受兩個參數,分别為dbIndex和cmdLine。其中,dbIndex表示目前使用的資料庫的索引值,cmdLine表示一個CmdLine類型的值,表示Redis指令的參數清單。

在函數内部,首先判斷是否啟用AOF持久化功能,如果未啟用,則直接傳回。否則,将cmdLine和dbIndex封裝成一個payload類型的值,并将其發送到aofChan通道中。

該方法可用于将Redis指令添加到AOF持久化日志中。在程式中調用該方法來實作AOF持久化功能。需要注意的是,由于該方法可以通路AofHandler結構體中的成員變量,是以需要確定線程安全性,以避免多個goroutine并發通路同一個AofHandler類型的變量時出現的問題。

func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
  if config.Properties.AppendOnly && handler.aofChan != nil {
    handler.aofChan <- &payload{
      cmdLine: cmdLine,
        dbIndex: dbIndex,
    }
  }
}           

▌1.3 實作handleAof

handleAof函數會不斷地從一個AOF通道(handler.aofChan)中讀取指令,然後将這些指令寫入到AOF檔案中。具體來說,該函數會周遊通道中的每個指令,如果該指令所在的資料庫不是目前正在處理的資料庫,則會先發送一個SELECT指令來切換到相應的資料庫。然後,将該指令轉化為Redis協定格式的位元組流,寫入到AOF檔案中。

如果寫入AOF檔案失敗,則會記錄日志并跳過該指令。在實際應用中,該函數通常在背景線程中運作,不斷地從AOF通道中讀取指令,并将它們寫入到AOF檔案中。同時,Redis還提供了另一種持久化方式——RDB(Redis Database File),它會将整個資料庫的狀态儲存到一個二進制檔案中。與AOF相比,RDB的優點是檔案體積小,恢複速度快,但缺點是可能會丢失最近的一部分寫操作。

func (handler *AofHandler) handleAof() {
  // serialized execution
  handler.currentDB = 0
  for p := range handler.aofChan {
    if p.dbIndex != handler.currentDB {
      // select db
      data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
      _, err := handler.aofFile.Write(data)
      if err != nil {
        logger.Warn(err)
        continue // skip this command
      }
      handler.currentDB = p.dbIndex
    }
    data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
    _, err := handler.aofFile.Write(data)
    if err != nil {
      logger.Warn(err)
    }
  }
}           

▌2、實作Aof落盤功能

▌2.1 Database結構體

改造Database結構體,讓它持有aofHandler:

type Database struct {
  dbSet      []*DB
  aofHandler *aof.AofHandler
}           

初始化aofHandler,在NewDatabase方法初始化話16個db之後初始化aofHandler

這段代碼是根據配置檔案中的AppendOnly參數來啟用或禁用Redis的AOF持久化機制。如果配置檔案中的AppendOnly參數為true,則會建立一個新的AOFHandler,并将其設定為mdb(一個Redis資料庫執行個體)的aofHandler屬性。同時,該代碼還會為每個資料庫(dbSet中的每個元素)設定一個addAof函數,該函數用于将指令寫入到AOF檔案中。

具體來說,該代碼會先建立一個新的AOFHandler,并将其與mdb對象關聯起來。然後,對于mdb中的每個資料庫,該代碼會建立一個addAof函數,該函數會将指令寫入到AOFHandler中。在建立addAof函數時,為了避免閉包問題,該代碼會将每個資料庫單獨指派給一個局部變量singleDB,然後在addAof函數中使用該局部變量。

總的來說,這段代碼是Redis中AOF持久化機制的啟用代碼,它會在Redis啟動時根據配置檔案中的參數來決定是否啟用AOF持久化,并将所有的寫操作寫入到AOF檔案中,以保證資料的持久性。

func NewDatabase() *Database {
  mdb := &Database{}
  if config.Properties.Databases == 0 {
    config.Properties.Databases = 16
  }
  mdb.dbSet = make([]*DB, config.Properties.Databases)
  for i := range mdb.dbSet {
    singleDB := makeDB()
    singleDB.index = i
    mdb.dbSet[i] = singleDB
  }
  if config.Properties.AppendOnly {
    aofHandler, err := aof.NewAOFHandler(mdb)
    if err != nil {
      panic(err)
    }
    mdb.aofHandler = aofHandler
    for _, db := range mdb.dbSet {
      // avoid closure
      singleDB := db
      singleDB.addAof = func(line CmdLine) {
        mdb.aofHandler.AddAof(singleDB.index, line)
      }
    }
  }
  return mdb
}           

makeDB方法執行的時候給他一個空的實作

func makeDB() *DB {
  db := &DB{
    data: dict.MakeSyncDict(),
      addAof: func(line CmdLine) {

      },
  }
  return db
}           

▌2.2 改造keys指令

删除時候需要記錄:

func execDel(db *DB, args [][]byte) resp.Reply {
  keys := make([]string, len(args))
  for i, v := range args {
    keys[i] = string(v)
  }
  deleted := db.Removes(keys...)
  /**
   Aof記錄指令
    */
  if deleted > 0 {
    db.addAof(utils.ToCmdLine2("del", args...))
  }
  return reply.MakeIntReply(int64(deleted))
}           

FlushDB的時候:

func execFlushDB(db *DB, args [][]byte) resp.Reply {
  db.Flush()
  /**
   Aof記錄删除指令
    */
  db.addAof(utils.ToCmdLine2("flushdb", args...))
  return &reply.OkReply{}
}           

重命名:

func execRename(db *DB, args [][]byte) resp.Reply {
  if len(args) != 2 {
    return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
  }
  src := string(args[0])
  dest := string(args[1])

  entity, ok := db.GetEntity(src)
  if !ok {
    return reply.MakeErrReply("no such key")
  }
  db.PutEntity(dest, entity)
  db.Remove(src)
  /**
   Aof記錄指令
   */
  db.addAof(utils.ToCmdLine2("rename", args...))
  return &reply.OkReply{}
}           
func execRenameNx(db *DB, args [][]byte) resp.Reply {
  src := string(args[0])
  dest := string(args[1])

  _, ok := db.GetEntity(dest)
  if ok {
    return reply.MakeIntReply(0)
  }

  entity, ok := db.GetEntity(src)
  if !ok {
    return reply.MakeErrReply("no such key")
  }
  db.Removes(src, dest) // clean src and dest with their ttl
  db.PutEntity(dest, entity)
  /**
   Aof記錄指令
   */
  db.addAof(utils.ToCmdLine2("renamenx", args...))
  return reply.MakeIntReply(1)
}           

▌2.3 改造strings指令

func execSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
    Data: value,
  }
  db.PutEntity(key, entity)
  /**
   Aof記錄指令
   */
  db.addAof(utils.ToCmdLine2("set", args...))
  return &reply.OkReply{}
}           
func execSetNX(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]
  entity := &database.DataEntity{
    Data: value,
  }
  result := db.PutIfAbsent(key, entity)
  /**
   Aof記錄指令
   */
  db.addAof(utils.ToCmdLine2("setnx", args...))
  return reply.MakeIntReply(int64(result))
}           
func execGetSet(db *DB, args [][]byte) resp.Reply {
  key := string(args[0])
  value := args[1]

  entity, exists := db.GetEntity(key)
  db.PutEntity(key, &database.DataEntity{Data: value})
  if !exists {
    return reply.MakeNullBulkReply()
  }
old := entity.Data.([]byte)
/**
   Aof記錄指令
   */
db.addAof(utils.ToCmdLine2("getset", args...))
return reply.MakeBulkReply(old)
}           

▌3、實作Aof恢複

這一節就來實作redis重新開機時恢複資料的功能

▌實作LoadAof方法

LoadAof是一個用于加載追加日志檔案(AOF)到資料庫中的Go函數。以下是代碼的實作思路:

  1. 使用 handler.aofFilename 指定的檔案路徑打開AOF檔案。
  2. 如果打開檔案時發生錯誤,則記錄警告并傳回。
  3. 設定 defer 語句,在函數傳回時關閉檔案。
  4. 使用解析器将AOF檔案讀取為Redis協定指令流。
  5. 建立一個虛拟連接配接對象以儲存資料庫索引。
  6. 處理流中的每個指令,使用 handler.db.Exec 在資料庫上執行它。
  7. 如果執行指令時出現錯誤,則記錄錯誤消息。

總體而言,這個函數負責從AOF檔案中讀取Redis協定指令,并在資料庫上執行它們,記錄任何出現的錯誤。

func (handler *AofHandler) LoadAof() {

  file, err := os.Open(handler.aofFilename)
  if err != nil {
    logger.Warn(err)
    return
  }
  defer file.Close()
  ch := parser.ParseStream(file)
  fakeConn := &connection.Connection{} // only used for save dbIndex
  for p := range ch {
    if p.Err != nil {
      if p.Err == io.EOF {
        break
      }
      logger.Error("parse error: " + p.Err.Error())
      continue
    }
    if p.Data == nil {
      logger.Error("empty payload")
      continue
    }
    r, ok := p.Data.(*reply.MultiBulkReply)
    if !ok {
      logger.Error("require multi bulk reply")
      continue
    }
    ret := handler.db.Exec(fakeConn, r.Args)
    if reply.IsErrorReply(ret) {
      logger.Error("exec err", err)
    }
  }
}           

▌說在後面:

本文,僅僅是《從0開始,手寫Redis》 PDF的一部分,後面的内容 更加精彩。

持續疊代、持續更新 是 尼恩團隊的宗旨,

持續疊代、持續更新 也是 《從0開始,手寫Redis》PDF的靈魂。

後面會收集更多的面試真題,同時,遇到面試難題,可以來尼恩的社群《技術自由圈(原 瘋狂創客圈)》中溝通。

咱們的目标,打造宇宙最牛的《手寫Redis》寶典。

▌11個技術聖經 PDF:

從0開始,手寫Redis

繼續閱讀