天天看點

Redis6.0 特性解析總結

  1. Redis的多路複用技術,支援epoll、kqueue、selector
  2. 5.0版本及以前,處理用戶端請求的線程隻有一個,串行處理
  3. 6.0版本引入了worker Thread,隻處理網絡IO讀取和寫入,核心IO負責串行處理用戶端指令

就在 2019 年 12 月 20 号這天,衆所期待的 Redis 新版 6.0 rc1 釋出了(Redis 6 RC1 is out today)肯定很多關注的同學都進行了試用,雖然因為引入了 c11 的 _Atomic 導緻相當多的環境都無法直接編譯成功,但是對于想一探究竟的粉絲們來說,這是完全阻擋不了的熱情

新版除了增加 

ACLS 權限控制子產品

支援更為廣泛的新協定 RESP3

用戶端緩存

無磁盤同步

Cluster Proxy

 等十來個相當實用的新特性外,還對 長久以來 社群筒子們 呼聲比較高的 多線程 進行了支援,當然也帶來 性能提升了一倍 的好處

這次我們的任務有兩個:

  • 剖析 Redis6 多線程的實作方式

  • 與 Memcached 的多線程模型(個人認為這是一個極其經典的多線程網絡程式設計案例)進行對比

衆所周知,Redis 之前的版本一直都是典型的單線程模型(

注意:這裡不是指 Redis 單執行個體中隻有一個線程,而是表示 核心操作子產品由單線程完成,當然另外還有一些 輔助線程 從旁協助,比如 LRU 的淘汰過程

),為什麼不使用多線程呢,其實原因很簡單(官方解釋)

Redis6.0 特性解析總結

簡單說來就是:

  • 根據以往的場景,普通 KV 存儲 瓶頸壓根不在 CPU,而往往可能受到 

    記憶體

     和 

    網絡I/O

     的制約
  • Redis 中有各種類型的資料操作,甚至包括一些事務處理,如果采用多線程,則會被多線程産生的切換問題而困擾,也可能因為加鎖導緻系統架構變的異常複雜,更有可能會因為加鎖解鎖甚至死鎖造成的性能損耗

當然,單線程也會有 不能充分利用多核資源 弊端,這是一個權衡;而通常 Redis(包括 Redis cluster) 的性能已經足夠我們使用

那麼,既然 單線程 都已經基本能滿足場景,更不要說還能開啟 多執行個體、上叢集 等方式,那麼為什麼還要費力引入 多線程呢? 請繼續閱讀後方内容~~~

上面提到,瓶頸往往在 

記憶體

 和 

網絡I/O

記憶體方面毋容置疑,加就是了,雖然需要注意 NUMA陷阱(請自行 Google),但是也不是不能解決;

那麼能不能對 網絡I/O 進行進一步優化進而減少消耗呢,通常做法是:

  • 采用 DPDK 從核心層對網絡處理流程子產品進行優化

    (因為需要特殊支援,是以顯得不那麼大衆)
  • 利用多核優勢

于是 Redis 開發組的各位大佬們就想到 能不能通過 支援多線程 這一簡單惠民的方式進行解決(這裡也展現了大佬們對性能的極限追求),于是就有了 下面的架構(以 Read 為例):

Redis6.0 特性解析總結

根據上方結構簡圖可以看到,Redis 6 中的多線程 主要在處理 網絡 I/O 方面,對網絡事件進行監聽,分發給 work thread 進行處理,處理完以後将主動權交還給 主線程,進行 執行操作,當然後續還會有,執行後依然交由 work thread 進行響應資料的 socket write 操作

由于有些方法裡的代碼量比較大,我們這裡按照 典型的代碼片段進行解析,同志們可以根據文章提示的代碼位置 和 代碼裡面的關鍵詞 在源碼中搜素,

可能資料結構一些元素 看不太懂什麼意思,沒關系,先混個臉熟,後面看完回頭再看過來或者單獨把代碼 clone 下來讀一下 就明白了

聲明,我們代碼均來源于 官方github https://github.com/antirez/redis/tree/6.0 ,本次源碼基本都存在與 

src/network.c

 中

redis-server 邏輯首先執行 

initThreadedIO()

函數對 線程進行初始化,當然,也包括 根據配置 

server.io_threads_num

 控制線程個數,其中主線程的處理邏輯為 

IOThreadMain()

 函數

/* networking.c: line 2666 */

void *IOThreadMain(void *myid) {

/* The ID is the thread number (from 0 to server.iothreads_num-1), and is used by the thread to just manipulate a single sub-array of clients. */

// 線程 ID,跟普通線程池的操作方式一樣,都是通過 線程ID 進行操作

long id = (unsigned long)myid;

while(1) {

/* Wait for start */

// 這裡的等待操作比較特殊,沒有使用簡單的 sleep,避免了 sleep 時間設定不當可能導緻糟糕的性能,但是也有個問題就是頻繁 loop 可能一定程度上造成 cpu 占用較長

for (int j = 0; j < 1000000; j++) {

if (io_threads_pending[id] != 0) break;

}

/* Give the main thread a chance to stop this thread. */

if (io_threads_pending[id] == 0) {

pthread_mutex_lock(&io_threads_mutex[id]);

pthread_mutex_unlock(&io_threads_mutex[id]);

continue;

}

serverAssert(io_threads_pending[id] != 0);

// debug 模式

if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

/* Process: note that the main thread will never touch our list

* before we drop the pending count to 0. */

// 根據線程 id 以及待配置設定清單進行 任務配置設定

listIter li;

listNode *ln;

listRewind(io_threads_list[id],&li);

while((ln = listNext(&li))) {

client *c = listNodeValue(ln);

// 判斷讀寫類型

if (io_threads_op == IO_THREADS_OP_WRITE) {

writeToClient(c,0);

} else if (io_threads_op == IO_THREADS_OP_READ) {

// 這裡需要注意重複調用了 readQueryFromClient,不過不用擔心,有 CLIENT_PENDING_READ 辨別可以進行識别

readQueryFromClient(c->conn);

} else {

serverPanic("io_threads_op value is unknown");

}

}

listEmpty(io_threads_list[id]);

io_threads_pending[id] = 0;

if (tio_debug) printf("[%ld] Done\n", id);

}

}
           

handleClientsWithPendingReadsUsingThreads()

 待處理任務配置設定

/* networking.c: line 2871 */

/* When threaded I/O is also enabled for the reading + parsing side, the readable handler will just put normal clients into a queue of clients to process (instead of serving them synchronously). This function runs the queue using the I/O threads, and process them in order to accumulate the reads in the buffers, and also parse the first command available rendering it in the client structures. */

int handleClientsWithPendingReadsUsingThreads(void) {

// 是否開啟 線程讀

if (!io_threads_active || !server.io_threads_do_reads) return 0;

int processed = listLength(server.clients_pending_read);

if (processed == 0) return 0;

if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

/* Distribute the clients across N different lists. */

// 将待處理任務進行配置設定,配置設定方式為 RR (round robin) 即基于任務到達時間片進行配置設定

listIter li;

listNode *ln;

listRewind(server.clients_pending_read,&li);

int item_id = 0;

while((ln = listNext(&li))) {

client *c = listNodeValue(ln);

int target_id = item_id % server.io_threads_num;

listAddNodeTail(io_threads_list[target_id],c);

item_id++;

}

/* Give the start condition to the waiting threads, by setting the start condition atomic var. */

// 設定任務個數參數

io_threads_op = IO_THREADS_OP_READ;

for (int j = 0; j < server.io_threads_num; j++) {

int count = listLength(io_threads_list[j]);

io_threads_pending[j] = count;

}

/* Wait for all threads to end their work. */

// 等待所有線程任務都處理完畢

while(1) {

unsigned long pending = 0;

for (int j = 0; j < server.io_threads_num; j++)

pending += io_threads_pending[j];

if (pending == 0) break;

}

if (tio_debug) printf("I/O READ All threads finshed\n");

/* Run the list of clients again to process the new buffers. */

// 繼續運作,等待新的處理任務

listRewind(server.clients_pending_read,&li);

while((ln = listNext(&li))) {

client *c = listNodeValue(ln);

c->flags &= ~CLIENT_PENDING_READ;

if (c->flags & CLIENT_PENDING_COMMAND) {

c->flags &= ~ CLIENT_PENDING_COMMAND;

processCommandAndResetClient(c);

}

processInputBufferAndReplicate(c);

}

listEmpty(server.clients_pending_read);

return processed;

}
           

處理新連接配接調用 

acceptCommonHandler()

 函數進行處理,随後調用 

createClient()

 建立 client 連接配接結構資料 并通過 

connSetReadHandler()

 設定 

readQueryFromClient()

 函數為處理 讀取事件的主要邏輯,執行順序為:

acceptTcpHandler/acceptUnixHandler
acceptCommonHandler
createClient -> connSetReadHandler
readQueryFromClient
           

readQueryFromClient()

 函數

/* networking.c: line 1791 */

void readQueryFromClient(connection *conn) {

client *c = connGetPrivateData(conn);

int nread, readlen;

size_t qblen;

/* Check if we want to read from the client later when exiting from the event loop. This is the case if threaded I/O is enabled. */

// 加入多線程模型已經啟用

if (postponeClientRead(c)) return;

// 如果沒有啟用多線程模型,則走下面繼續處理讀邏輯

// ....還有後續老邏輯

}
           

函數 

postponeClientRead()

 将任務放入處理隊列,而根據上面 

IOThreadMain()

 和 

handleClientsWithPendingReadsUsingThreads()

 的任務處理邏輯進行處理

/* networking.c: line 2852 */

int postponeClientRead(client *c) {

// 如果啟用多線程模型,并且判斷全局配置中是否支援多線程讀

if (io_threads_active &&

server.io_threads_do_reads &&

// 這裡有個點需要注意,如果是 master-slave 同步也有可能被認為是普通 讀任務,是以需要辨別

!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))

{

c->flags |= CLIENT_PENDING_READ;

// 将任務放入處理隊列

listAddNodeHead(server.clients_pending_read,c);

return 1;

} else {

return 0;

}

}
           

那麼 Redis 6 中的 多線程模型 與 Memcached 這一及其經典的多線程網絡程式設計案例中的模型 對比起來有哪些異同呢

Memcached 的線程機制(網上抄的,沒用過)

Redis6.0 特性解析總結

Memcached 伺服器采用 master-woker 模式進行工作,後再輔以 輔助線程。

服務端采用 socket 與用戶端通訊,主線程、工作線程 采用 pipe管道進行通訊。

主線程采用 libevent 監聽 listen、accept 的讀事件,事件響應後 将連接配接資訊的資料結構封裝起來 根據算法 選擇合适的工作線程,将 連接配接任務攜帶連接配接資訊 分發出去,相應的線程利用連接配接描述符 建立與 用戶端的socket連接配接 并進行後續的存取資料操作。

主線程和工作線程 處理事件流都采用狀态機進行事件轉移。

那麼顯而易見,他們的 線程模型 對比起來:

  • 相同點:都采用了 master-worker 這一經典思路
  • 不同點:Memcached 執行主邏輯也是在 worker 線程裡,模型更加簡單,不過這也歸功于 Memcached 簡易資料操作的特性産生的天然隔離;而 Redis 把處理邏輯還 交還給 master 線程,雖然一定程度上增加了模型複雜度,但是如果把處理邏輯放在 worker 線程,也很難保證隔離性

總結

Redis 多線程模型 可能無法滿足很多 粉絲設想的 類 Memcached 這種具有完全隔離性的多線程模型,但這也是各方面利弊的權衡,有失必有得

git個人開源spring cloud腳手架,快速上手開發

gitee版本個人spring cloud開源腳手架

繼續閱讀