- Redis的多路複用技術,支援epoll、kqueue、selector
- 5.0版本及以前,處理用戶端請求的線程隻有一個,串行處理
- 6.0版本引入了worker Thread,隻處理網絡IO讀取和寫入,核心IO負責串行處理用戶端指令
就在 2019 年 12 月 20 号這天,衆所期待的 Redis 新版 6.0 rc1 釋出了(Redis 6 RC1 is out today)肯定很多關注的同學都進行了試用,雖然因為引入了 c11 的 _Atomic 導緻相當多的環境都無法直接編譯成功,但是對于想一探究竟的粉絲們來說,這是完全阻擋不了的熱情
新版除了增加
、
ACLS 權限控制子產品
、
支援更為廣泛的新協定 RESP3
、
用戶端緩存
、
無磁盤同步
Cluster Proxy
等十來個相當實用的新特性外,還對 長久以來 社群筒子們 呼聲比較高的 多線程 進行了支援,當然也帶來 性能提升了一倍 的好處
這次我們的任務有兩個:
剖析 Redis6 多線程的實作方式
與 Memcached 的多線程模型(個人認為這是一個極其經典的多線程網絡程式設計案例)進行對比
衆所周知,Redis 之前的版本一直都是典型的單線程模型(
注意:這裡不是指 Redis 單執行個體中隻有一個線程,而是表示 核心操作子產品由單線程完成,當然另外還有一些 輔助線程 從旁協助,比如 LRU 的淘汰過程
),為什麼不使用多線程呢,其實原因很簡單(官方解釋)
簡單說來就是:
- 根據以往的場景,普通 KV 存儲 瓶頸壓根不在 CPU,而往往可能受到
和記憶體
的制約網絡I/O
- Redis 中有各種類型的資料操作,甚至包括一些事務處理,如果采用多線程,則會被多線程産生的切換問題而困擾,也可能因為加鎖導緻系統架構變的異常複雜,更有可能會因為加鎖解鎖甚至死鎖造成的性能損耗
當然,單線程也會有 不能充分利用多核資源 弊端,這是一個權衡;而通常 Redis(包括 Redis cluster) 的性能已經足夠我們使用
那麼,既然 單線程 都已經基本能滿足場景,更不要說還能開啟 多執行個體、上叢集 等方式,那麼為什麼還要費力引入 多線程呢? 請繼續閱讀後方内容~~~
上面提到,瓶頸往往在
記憶體
和
網絡I/O
記憶體方面毋容置疑,加就是了,雖然需要注意 NUMA陷阱(請自行 Google),但是也不是不能解決;
那麼能不能對 網絡I/O 進行進一步優化進而減少消耗呢,通常做法是:
-
(因為需要特殊支援,是以顯得不那麼大衆)采用 DPDK 從核心層對網絡處理流程子產品進行優化
-
利用多核優勢
于是 Redis 開發組的各位大佬們就想到 能不能通過 支援多線程 這一簡單惠民的方式進行解決(這裡也展現了大佬們對性能的極限追求),于是就有了 下面的架構(以 Read 為例):
根據上方結構簡圖可以看到,Redis 6 中的多線程 主要在處理 網絡 I/O 方面,對網絡事件進行監聽,分發給 work thread 進行處理,處理完以後将主動權交還給 主線程,進行 執行操作,當然後續還會有,執行後依然交由 work thread 進行響應資料的 socket write 操作
由于有些方法裡的代碼量比較大,我們這裡按照 典型的代碼片段進行解析,同志們可以根據文章提示的代碼位置 和 代碼裡面的關鍵詞 在源碼中搜素,
可能資料結構一些元素 看不太懂什麼意思,沒關系,先混個臉熟,後面看完回頭再看過來或者單獨把代碼 clone 下來讀一下 就明白了
聲明,我們代碼均來源于 官方github https://github.com/antirez/redis/tree/6.0 ,本次源碼基本都存在與
src/network.c
中
redis-server 邏輯首先執行
initThreadedIO()
函數對 線程進行初始化,當然,也包括 根據配置
server.io_threads_num
控制線程個數,其中主線程的處理邏輯為
IOThreadMain()
函數
/* networking.c: line 2666 */
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is used by the thread to just manipulate a single sub-array of clients. */
// 線程 ID,跟普通線程池的操作方式一樣,都是通過 線程ID 進行操作
long id = (unsigned long)myid;
while(1) {
/* Wait for start */
// 這裡的等待操作比較特殊,沒有使用簡單的 sleep,避免了 sleep 時間設定不當可能導緻糟糕的性能,但是也有個問題就是頻繁 loop 可能一定程度上造成 cpu 占用較長
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
// debug 模式
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
// 根據線程 id 以及待配置設定清單進行 任務配置設定
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 判斷讀寫類型
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 這裡需要注意重複調用了 readQueryFromClient,不過不用擔心,有 CLIENT_PENDING_READ 辨別可以進行識别
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
handleClientsWithPendingReadsUsingThreads()
待處理任務配置設定
/* networking.c: line 2871 */
/* When threaded I/O is also enabled for the reading + parsing side, the readable handler will just put normal clients into a queue of clients to process (instead of serving them synchronously). This function runs the queue using the I/O threads, and process them in order to accumulate the reads in the buffers, and also parse the first command available rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
// 是否開啟 線程讀
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
// 将待處理任務進行配置設定,配置設定方式為 RR (round robin) 即基于任務到達時間片進行配置設定
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the start condition atomic var. */
// 設定任務個數參數
io_threads_op = IO_THREADS_OP_READ;
for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Wait for all threads to end their work. */
// 等待所有線程任務都處理完畢
while(1) {
unsigned long pending = 0;
for (int j = 0; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
// 繼續運作,等待新的處理任務
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
processCommandAndResetClient(c);
}
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
return processed;
}
處理新連接配接調用
acceptCommonHandler()
函數進行處理,随後調用
createClient()
建立 client 連接配接結構資料 并通過
connSetReadHandler()
設定
readQueryFromClient()
函數為處理 讀取事件的主要邏輯,執行順序為:
acceptTcpHandler/acceptUnixHandler
acceptCommonHandler
createClient -> connSetReadHandler
readQueryFromClient
readQueryFromClient()
函數
/* networking.c: line 1791 */
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from the event loop. This is the case if threaded I/O is enabled. */
// 加入多線程模型已經啟用
if (postponeClientRead(c)) return;
// 如果沒有啟用多線程模型,則走下面繼續處理讀邏輯
// ....還有後續老邏輯
}
函數
postponeClientRead()
将任務放入處理隊列,而根據上面
IOThreadMain()
和
handleClientsWithPendingReadsUsingThreads()
的任務處理邏輯進行處理
/* networking.c: line 2852 */
int postponeClientRead(client *c) {
// 如果啟用多線程模型,并且判斷全局配置中是否支援多線程讀
if (io_threads_active &&
server.io_threads_do_reads &&
// 這裡有個點需要注意,如果是 master-slave 同步也有可能被認為是普通 讀任務,是以需要辨別
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
// 将任務放入處理隊列
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
那麼 Redis 6 中的 多線程模型 與 Memcached 這一及其經典的多線程網絡程式設計案例中的模型 對比起來有哪些異同呢
Memcached 的線程機制(網上抄的,沒用過)
Memcached 伺服器采用 master-woker 模式進行工作,後再輔以 輔助線程。
服務端采用 socket 與用戶端通訊,主線程、工作線程 采用 pipe管道進行通訊。
主線程采用 libevent 監聽 listen、accept 的讀事件,事件響應後 将連接配接資訊的資料結構封裝起來 根據算法 選擇合适的工作線程,将 連接配接任務攜帶連接配接資訊 分發出去,相應的線程利用連接配接描述符 建立與 用戶端的socket連接配接 并進行後續的存取資料操作。
主線程和工作線程 處理事件流都采用狀态機進行事件轉移。
那麼顯而易見,他們的 線程模型 對比起來:
- 相同點:都采用了 master-worker 這一經典思路
- 不同點:Memcached 執行主邏輯也是在 worker 線程裡,模型更加簡單,不過這也歸功于 Memcached 簡易資料操作的特性産生的天然隔離;而 Redis 把處理邏輯還 交還給 master 線程,雖然一定程度上增加了模型複雜度,但是如果把處理邏輯放在 worker 線程,也很難保證隔離性
總結
Redis 多線程模型 可能無法滿足很多 粉絲設想的 類 Memcached 這種具有完全隔離性的多線程模型,但這也是各方面利弊的權衡,有失必有得
git個人開源spring cloud腳手架,快速上手開發
gitee版本個人spring cloud開源腳手架