天天看點

Redis 多線程網絡模型[譯]

作者:滴水穿石的架構師之路

說明

今天讀了一篇關于【Redis 多線程網絡模型】的國外文章,寫的非常不錯,深入淺出,從單線程、多線程、io多路複用以及源碼來分析Redis的多線程模型,把他簡單翻譯下;

大家可以看下原文: https://www.sobyte.net/post/2022-03/redis-multi-threaded-network-model/

概述

Redis已經成為目前技術選擇中高性能緩存解決方案的行業标準,是以Redis成為後端開發人員的基本技能之一。

Redis本質上是一個web伺服器,對于web伺服器來說,網絡模型是它的本質,如果您了解Web伺服器的網絡模型,就會了解它的本質。

本文分步介紹 Redis 網絡模型,并分析其如何從單線程演進到多線程。此外,我們還分析了 Redis 網絡模型中做出的許多選擇背後的思想,以幫助讀者更好地了解 Redis 網絡模型的設計。

Redis有多快

根據官方基準測試,在具有平均硬體的 Linux 機器上運作的單個 Redis 執行個體通常可以達到簡單指令(O(N) 或O(log(N)) 的 QPS 8w+,而流水線批處理的 QPS 高達 100w,僅從性能來看,Redis 可以稱為高性能緩存解決方案。

Redis為什麼這麼快

Redis的高性能歸功于以下基本要素。

Redis 多線程網絡模型[譯]
  • C實作,雖然C有助于Redis的性能,但是語言不是核心因素
  • 記憶體中的io,RRedis在純記憶體操作方面比其他基于磁盤的資料庫具有天然的性能優勢。
  • I/O 多路複用,用于基于 epoll/select/kqueue 和其他 I/O 多路複用技術的高吞吐量網絡 I/O。
  • 單線程模型,單線程不能利用多核的優勢,但另一方面,它避免了多線程頻繁的上下文切換,以及鎖等同步機制的開銷。

為什麼 Redis 選擇單線程?

  • Redis 的核心網模型是單線程的,一開始引起了很多質疑,而 Redis 官方對此的回答是:
CPU 成為 Redis 瓶頸的情況并不常見,因為通常 Redis 要麼是記憶體限制,要麼是網絡綁定的。例如,使用流水線 在普通 Linux 系統上運作的 Redis 每秒甚至可以傳遞 100 萬個請求,是以如果您的應用程式主要使用 O(N) 或 O(log(N)) 指令,它幾乎不會使用太多 CPU。
  • 從本質上講,這意味着 CPU 通常不是資料庫的瓶頸,因為大多數請求不是 CPU 密集型的,而是 I/O 密集型的。
  • 特别是在 Redis 的情況下,如果你不考慮像 RDB/AOF 這樣的持久性方案,Redis 是一個完全在記憶體中的操作,它非常快。
  • Redis 真正的性能瓶頸是網絡 I/O,即用戶端和伺服器之間網絡傳輸的延遲,是以 Redis 選擇單線程 I/O 複用來實作其核心網絡模型。
  • 以上是比較籠統的官方回答,但其實選擇單線程的更具體原因可以總結如下:

避免過多的上下文切換開銷

  • 在多線程排程過程中,需要在CPU之間切換線程上下文,上下文切換涉及一系列寄存器替換,程式堆棧複位甚至CPU緩存和TLB快速表報廢,如程式計數器,堆棧指針和程式狀态字。由于單個程序中的多個線程共享程序位址空間,是以線程上下文比程序上下文小得多,并且在跨程序排程的情況下,需要切換整個程序位址空間。
  • 在單線程排程的情況下,可以避免程序内頻繁的線程切換開銷,因為程式始終在程序中的單個線程内運作,并且沒有多線程切換方案。

避免同步機制的開銷

如果 Redis 選擇多線程模型,并且因為 Redis 是一個資料庫,難免會涉及到底層的資料同步問題,這必然會引入一些同步機制,比如鎖,我們知道 Redis 提供的不僅僅是簡單的鍵值資料結構,還有清單、集合、哈希等豐富的資料結構。 不同的資料結構對同步通路的鎖定粒度不同,這可能會導緻資料操作過程中大量的鎖定和解鎖開銷,增加程式複雜性并降低性能。

簡單且可維護

Redis 的作者 Salvatore Sanfilippo(别名 antirez)在 Redis 的設計和代碼中有着一種近乎偏執的簡單哲學,當你閱讀 Redis 源代碼或向其送出 PR 時,你可以感受到這種偏執。是以,在早期的 Redis 中,簡單易維護的代碼必然是其核心指導原則之一。而多線程的引入不可避免地增加了代碼複雜性并降低了可維護性。

事實上,多線程程式設計并不完美。首先,多線程程式設計的引入将不再保持代碼邏輯串行,代碼執行的順序将變得不可預測,如果不小心會導緻各種并發程式設計問題;其次,多線程模式也使調試更加複雜和麻煩。網絡上有一張有趣的圖檔,生動地描繪了并發程式設計面臨的困境。

您對多線程程式設計的期望與實際的多線程程式設計。

Redis 多線程網絡模型[譯]

如果 Redis 使用多線程模型,則必須将所有底層資料結構實作為線程安全。這反過來又使 Redis 實作更加複雜。

簡而言之,Redis 選擇單線程是在保持代碼簡單和可維護性與保持足夠性能之間的權衡。

Redis 真的是單線程的嗎?

在讨論這個問題之前,我們需要澄清“單線程”概念的界限:它是否涵蓋核心網絡模型或整個 Redis?如果是前者,那麼答案是肯定的。網絡模型是單線程的,直到 Redis 在 v6.0 中正式引入多線程;如果是後者,那麼答案是否定的。Redis 早在 v4.0 中就引入了多線程。

是以,在讨論 Redis 中的多線程時,在 Redis 版本中描述兩個要點非常重要。

  • Redis v4.0(為異步任務引入了多線程)
  • Redis v6.0(在網絡模型中正式實作 I/O 多線程)

單線程事件循環

  • 我們先來剖析一下 Redis 的核心網絡模型。從 Redis v1.0 到 v6.0,Redis 的核心網絡模型一直是典型的單 Reactor 模型:使用 epoll/select/kqueue 等多路複用技術在單線程事件循環中處理事件(用戶端請求),最後将響應資料寫回用戶端。
Redis 多線程網絡模型[譯]
  • 這裡有幾個核心概念需要學習:
  • 用戶端:用戶端對象,Redis 是典型的 CS 架構(用戶端 <–> 伺服器),其中用戶端通過套接字與伺服器建立網絡通道,然後發送請求的指令,伺服器執行請求的指令并回複。Redis 使用結構用戶端來存儲用戶端的所有相關資訊,包括但不限于 wrapped socket connection -- *conn 、 currently selected database pointer -- *db 、 read buffer -- querybuf 、 write buffer -- buf 、 write data linked list -- reply 等。
  • aeApiPoll:I/O複用API,是基于epoll_wait/select/kevent等系統調用進行包裝,監聽要觸發的讀寫事件,然後進行處理,它是Event Loop(Event Loop)的核心功能,是事件驅動運作的基礎。
  • acceptTcpHandler:連接配接應答處理器,底層使用系統調用 accept 接受來自用戶端的新連接配接,并将新連接配接注冊綁定指令讀取處理器用于後續處理新用戶端TCP連接配接;除了這個處理器之外,還有一個相應的 acceptUnixHandler 用于處理Unix域套接字和 acceptTLSHandler 用于處理TLS加密連接配接。readQueryFromClient :一個指令讀取處理器,用于解析和執行用戶端請求的指令。
  • beforeSleep:在事件循環進入aeApiPoll并等待事件到達之前執行的函數。它包含一些例行任務,例如将來自 client->buf 或 client->reply (此處需要兩個緩沖區)的響應寫回用戶端,将 AOF 緩沖區中的資料儲存到磁盤等。還有一個在aeApiPoll之後執行的afterSleep函數。sendReplyToClient:指令回複處理程式,當一個事件循環後寫緩沖區中仍有資料時,該處理程式将被注冊并綁定到相應的連接配接,當連接配接觸發寫就緒事件時,它會将寫緩沖區中的剩餘資料寫回用戶端。
  • Redis 在内部實作了基于 epoll/select/kqueue/evport 的高性能事件庫 AE,以實作 Linux/MacOS/FreeBSD/Solaris 的高性能事件循環模型。Redis的核心網絡模型正式建立在AE之上,包括I/O複用,以及各種處理器綁定的注冊,都是基于AE。

在這一點上,我們可以描述用戶端從 Redis 請求指令的工作方式。

- Redis 伺服器啟動,打開主線程事件循環,将 acceptTcpHandler 連接配接應答處理器注冊到與使用者配置的偵聽端口對應的檔案描述符,并等待新連接配接到達。 - 在用戶端和伺服器之間建立網絡連接配接。

- 調用 acceptTcpHandler ,主線程使用 AE 的 API 将 readQueryFromClient 指令讀取處理器綁定到新連接配接對應的檔案描述符,并初始化 client 綁定此用戶端連接配接。

- 用戶端發送請求指令,觸發讀取就緒事件,主線程調用 readQueryFromClient 将用戶端通過套接字發送的指令讀取到 client->querybuf 讀取緩沖區中。

- 接下來調用 processInputBuffer ,其中 processInlineBuffer 或 processMultibulkBuffer 用于根據 Redis 協定解析指令,最後調用 processCommand 執行指令。

- 根據所請求指令的類型(SET、GET、DEL、EXEC 等),配置設定合适的指令執行器來執行它,最後調用 addReply 家族中的一系列函數将響應資料寫入對應 client 的寫緩沖區: client->buf 或 client->reply , client->buf 是首選寫出緩沖區,固定大小為 16KB, 一般可以緩沖足夠的響應資料,但如果用戶端在時間視窗内需要非常大的響應,那麼它會自動切換到 client->reply 連結清單,理論上可以容納無限量的資料(受機器實體記憶體限制)最後将 client 添加到 LIFO 隊列 clients_pending_write 中。

- 在事件循環中,主線程執行 beforeSleep –> handleClientsWithPendingWrites ,周遊 clients_pending_write 隊列,調用 writeToClient 将 client 的寫緩沖區中的資料傳回給用戶端,如果寫緩沖區中還有剩餘資料,則注冊 sendReplyToClient 指令以回複處理器,并為連接配接提供寫就緒事件, 并等待用戶端寫入,然後再繼續寫回事件循環中的剩餘響應資料。

對于那些想要利用多核性能的人來說,官方的 Redis 解決方案簡單而殘酷:在同一台機器上運作更多 Redis 執行個體。事實上,為了確定高可用性,線上業務不太可能處于獨立模式。更常見的是使用多節點、多資料分片的 Redis 分布式叢集,以提高性能,保證高可用性。

多線程異步任務

以上是 Redis 的核心網絡模型,直到 Redis v6.0 才轉變為多線程模型。但這并不意味着 Redis 一直是單線程的。

Redis 在 v4.0 中引入了多線程來做一些異步操作,主要是針對非常耗時的指令。通過異步執行這些指令,可以避免阻塞單線程事件循環。

我們知道 Redis DEL 指令用于删除一個或多個存儲的鍵值,它是一個阻塞指令。在大多數情況下,您要删除的鍵不會存儲很多值,最多幾十個或幾百個對象,是以可以快速執行。但是,如果要删除包含數百萬個對象的非常大的鍵值對,則此指令可能會阻塞至少幾秒鐘,并且由于事件循環是單線程的,是以它将阻止随後的其他事件,進而導緻吞吐量降低。

Redis的作者Antirez對解決這個問題進行了大量思考。起初,他提出了一個增量解決方案:使用計時器和資料光标,他一次删除少量資料,比如 1000 個對象,最終清除所有資料。但是這個解決方案有一個緻命的缺陷:如果其他用戶端繼續将資料寫入一個同時被逐漸删除的密鑰,并且删除率跟不上正在寫入的資料,那麼記憶體将被無休止地消耗,這可以通過一個聰明的解決方案來解決,但這個實作使 Redis 更加複雜。多線程似乎是一個滴水不漏的解決方案:簡單易懂。是以,最終,antirez 選擇引入多線程來實作此類非阻塞指令。 更多Antirez對此的想法可以在他的部落格中找到:懶惰的Redis更好Redis。

是以,在 Redis v4.0 之後,添加了一些非阻塞指令,如 UNLINK 、 FLUSHALL ASYNC 、 FLUSHDB ASYNC 。

Redis 多線程網絡模型[譯]

UNLINK 指令其實是 DEL 的異步版本,它不會同步删除資料,隻是暫時從密鑰空間中删除密鑰,然後将任務添加到異步隊列中,最後背景線程會删除它。但這裡我們需要考慮一種情況,如果我們使用 UNLINK 删除一個非常小的鍵,異步方式做會有更多的開銷,是以它會先計算一個開銷門檻值,隻有當這個值大于 64 時我們才會使用異步方式删除密鑰, 對于清單、集合和哈希等基本資料類型,門檻值是其中存儲的對象數存儲的對象數。

Redis 多線程網絡模型

如前所述,Redis 最初選擇單線程網絡模型的原因是因為 CPU 通常不是性能瓶頸,瓶頸往往是記憶體和網絡,是以單線程就足夠了。那麼為什麼 Redis 現在引入多線程呢?一個簡單的事實是,Redis的網絡I/O瓶頸越來越明顯。

随着網際網路的快速增長,網際網路業務系統處理的線上流量越來越多,而 Redis 的單線程模式導緻系統在網絡 I/O 上消耗大量 CPU 時間,進而降低吞吐量。有兩種方法可以提高 Redis 的性能。 - 優化的網絡 I/O 子產品 - 提高機器記憶體讀寫速度

後者取決于硬體的發展,暫時無法解決。是以,我們隻能從前者入手,網絡I/O的優化可以分為兩個方向。 - 零拷貝技術或 DPDK 技術 - 利用多核

零拷貝技術有其局限性,無法完全适應 Redis 等複雜的網絡 I/O 場景。(有關 CPU 時間和 Linux 零拷貝技術的網絡 I/O 消耗的更多資訊,請閱讀上一篇文章。通過繞過NIC I/O繞過核心堆棧的DPDK技術過于複雜,需要核心甚至硬體支援。

是以,利用多核是優化網絡 I/O 的最經濟高效的方法。

在 6.0 版本之後,Redis 正式将多線程引入核心網絡模型,也稱為 I/O 線程,而 Redis 現在有了真正的多線程模型。在上一節中,我們了解了 Redis 6.0 之前的單線程事件循環模型,它實際上是一個非常經典的 Reactor 模型。

Redis 多線程網絡模型[譯]
Redis 多線程網絡模型[譯]

Reactor 模式用于 Linux 平台上大多數主流的高性能網絡庫/架構,如 netty、libevent、libuv、POE(Perl)、Twisted (Python) 等。

反應器模式本質上是指使用 I/O multiplexing (I/O multiplexing) + non-blocking I/O (non-blocking I/O) 模式。

在 6.0 版本之前,Redis 的核心網絡模型是單個 Reactor 模型:所有事件都在單個線程中處理,盡管在 4.0 版本中引入了多線程,但它更像是針對特定場景(删除超大鍵值等)的更新檔,不能被視為核心網絡模型的多線程。

一般來說,單反應器模型在引入多線程後演變為多反應器模型,基本工作模型如下。

![[Pasted image 20230513094632.png]]

Redis 多線程網絡模型[譯]

此模式不是單線程事件循環,而是有多個線程(子反應器),每個線程維護一個單獨的事件循環,主反應器接收新連接配接并将其分發給子反應器進行獨立處理,子反應器将響應寫回用戶端。

多反應器模式通常可以等同于Master-Workers模式,例如Nginx和Memcached,它們使用這種多線程模型,盡管實作細節因項目而異,但模式通常是一緻的。

設計思維

Redis 也實作了多線程,但不是在标準的多反應器/主工作線程模式中,原因我們将在後面分析。現在,我們來看一下 Redis 多線程網絡模型的總體設計。

Redis 多線程網絡模型[譯]
  1. Redis 伺服器啟動,打開主線程事件循環,将 acceptTcpHandler 連接配接應答處理器注冊到與使用者配置的偵聽端口對應的檔案描述符,并等待新連接配接到達。
  2. 在用戶端和伺服器之間建立網絡連接配接。
  3. 調用 acceptTcpHandler ,主線程使用 AE 的 API 将 readQueryFromClient 指令讀取處理器綁定到新連接配接對應的檔案描述符,并初始化 client 綁定此用戶端連接配接。
  4. 用戶端發送請求指令,觸發讀取就緒事件。伺服器的主線程不是通過套接字讀取用戶端的請求指令,而是首先将 client 放入 LIFO 隊列 clients_pending_read 中。
  5. 在事件循環中,主線程執行 beforeSleep –> handleClientsWithPendingReadsUsingThreads ,使用輪詢負載均衡政策在 I/O 線程之間均勻配置設定 clients_pending_read 隊列中的連接配接 I/O 線程通過套接字讀取用戶端請求的指令,将其存儲在 client->querybuf 中并解析第一個指令,但不執行它, 而主線程正忙于輪詢并等待所有 I/O 線程完成讀取任務。
  6. 當主線程和所有 I/O 線程完成讀取後,主線程完成繁忙輪詢,周遊 clients_pending_read 隊列,執行所有用戶端連接配接的請求指令,并首先調用 processCommandResetClient 以執行已解析的第一個指令。然後調用 processInputBuffer 解析并執行用戶端連接配接的所有指令,使用 processInlineBuffer 或 processMultibulkBuffer 根據 Redis 協定解析指令,最後調用 processCommand 執行名為 processCommand 的指令來執行指令。
  7. 根據所請求指令的類型(SET、GET、DEL、EXEC 等),配置設定相應的指令執行器來執行它,最後調用 addReply 家族中的一系列函數将響應資料寫入對應的 client 寫出緩沖區: client->buf 或 client->reply , client->buf 是首選的寫出緩沖區,固定大小為 16KB, 它通常會緩沖足夠的響應資料,但如果用戶端需要在時間範圍内響應大量資料,則會自動切換到 client->reply 連結清單。理論上,連結清單可以容納無限量的資料(受機器實體記憶體的限制),最後将 client 添加到 LIFO 隊列 clients_pending_write 中。
  8. 在事件循環中,主線程執行 beforeSleep –> handleClientsWithPendingWritesUsingThreads ,使用輪詢負載平衡政策将 clients_pending_write 隊列中的連接配接均勻分布到 I/O 線程和主線程本身。I/O 線程通過調用 writeToClient 将 client 的寫緩沖區中的資料寫回用戶端,主線程忙于輪詢所有 I/O 線程以完成其寫入任務,主線程忙于輪詢并等待所有 I/O 線程完成其寫入任務。
  9. 當主線程和所有 I/O 線程完成寫出後,主線程完成繁忙輪詢并周遊 clients_pending_write 隊列。如果 client 寫入緩沖區中仍有資料,它将 sendReplyToClient 注冊到該連接配接的寫就緒事件,并等待用戶端寫入,然後再繼續寫回事件循環中的剩餘響應資料。

這裡的大多數邏輯與之前的單線程模型中相同,唯一的變化是異步讀取用戶端請求并将響應資料寫回 I/O 線程的邏輯。這裡需要特别注意的是:I/O 線程隻讀取和解析用戶端指令,并不實際執行它們,用戶端指令的執行最終在主線程上完成。

源代碼剖析

以下所有代碼均基于 Redis v6.0.10 版本。

多線程初始化

void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    // 如果使用者隻配置了一個 I/O 線程,則不會建立新線程(效率低),直接在主線程裡處理 I/O。
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    // 根據使用者配置的 I/O 線程數,啟動線程。
    for (int i = 0; i < server.io_threads_num; i++) {
        // 初始化 I/O 線程的本地任務隊列。
        io_threads_list[i] = listCreate();
        if (i == 0) continue; // 線程 0 是主線程。

        // 初始化 I/O 線程并啟動。
        pthread_t tid;
        // 每個 I/O 線程會配置設定一個本地鎖,用來休眠和喚醒線程。
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 每個 I/O 線程配置設定一個原子計數器,用來記錄目前遺留的任務數量。
       io_threads_pending[i] = 0;
        // 主線程在啟動 I/O 線程的時候會預設先鎖住它,直到有 I/O 任務才喚醒它。
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 啟動線程,進入 I/O 線程的主邏輯函數 IOThreadMain。
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
           
  1. initThreadedIO 在 Redis 伺服器啟動時的初始化工作結束時調用,以初始化 I/O 多線程并啟動它。
  2. Redis 多線程模式預設關閉,需要使用者在 redis.conf 配置檔案中啟用。
io-threads 4
io-threads-do-reads yes
           

讀取請求

  • 當用戶端發送請求指令時,它會在 Redis 主線程中觸發事件循環,并回調指令處理程式 readQueryFromClient 。在以前的單線程模型中,此方法将直接讀取和分析用戶端指令并執行它。但是,在多線程模式下, client 将添加到 clients_pending_read 任務隊列中,然後将主線程配置設定給 I/O 線程以讀取用戶端請求的指令。
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    // 檢查是否開啟了多線程,如果是則把 client 加入異步隊列之後傳回。
    if (postponeClientRead(c)) return;
    
    // 省略代碼,下面的代碼邏輯和單線程版本幾乎是一樣的。
    ... 
}

int postponeClientRead(client *c) {
    // 當多線程 I/O 模式開啟、主線程沒有在處理阻塞任務時,将 client 加入異步隊列。
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 給 client 打上 CLIENT_PENDING_READ 辨別,表示該 client 需要被多線程處理,
        // 後續在 I/O 線程中會在讀取和解析完用戶端指令之後判斷該辨別并放棄執行指令,讓主線程去執行。
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
       return 0;
    }
}
           

然後,主線程在事件循環的 beforeSleep() 方法中調用

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // 周遊待讀取的 client 隊列 clients_pending_read,
    // 通過 RR 輪詢均勻地配置設定給 I/O 線程和主線程自己(編号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 設定目前 I/O 操作為讀取操作,給每個 I/O 線程的計數器設定配置設定的任務數量,
    // 讓 I/O 線程可以開始工作:隻讀取和解析指令,不執行。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主線程自己也會去執行讀取用戶端請求指令的任務,以達到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0,
    // 表示所有任務都已經執行完成,結束輪詢。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    // 周遊待讀取的 client 隊列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标記,
    // 然後解析并執行所有 client 的指令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            // client 的第一條指令已經被解析好了,直接嘗試執行。
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        processInputBuffer(c); // 繼續解析并執行 client 指令。

        // 指令執行完成之後,如果 client 中有響應資料需要回寫到用戶端,則将 client 加入到待寫出隊列 clients_pending_write
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}
           

這裡的核心工作是。

  • 循環通路要讀取的 client 隊列 clients_pending_read ,并将所有任務配置設定給 I/O 線程和主線程,以通過 RR 政策讀取和分析用戶端指令。
  • 忙于輪詢,等待所有 I/O 線程完成其任務。
  • 最後循環通路 clients_pending_read 并執行所有 client 指令。

寫回響應

讀取、解析并執行指令後,用戶端指令的響應資料已存儲在 client->buf 或 client->reply 中。接下來,您需要将響應資料寫回用戶端。同樣,在 beforeSleep 中,主線程調用 handleClientsWithPendingWritesUsingThreads 。

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // 如果使用者設定的 I/O 線程數等于 1 或者目前 clients_pending_write 隊列中待寫出的 client
    // 數量不足 I/O 線程數的兩倍,則不用多線程的邏輯,讓所有 I/O 線程進入休眠,
    // 直接在主線程把所有 client 的相應資料回寫到用戶端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 喚醒正在休眠的 I/O 線程(如果有的話)。
    if (!server.io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    // 周遊待寫出的 client 隊列 clients_pending_write,
    // 通過 RR 輪詢均勻地配置設定給 I/O 線程和主線程自己(編号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 設定目前 I/O 操作為寫出操作,給每個 I/O 線程的計數器設定配置設定的任務數量,
    // 讓 I/O 線程可以開始工作,把寫出緩沖區(client->buf 或 c->reply)中的響應資料回寫到用戶端。
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主線程自己也會去執行讀取用戶端請求指令的任務,以達到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0。
    // 表示所有任務都已經執行完成,結束輪詢。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    // 最後再周遊一次 clients_pending_write 隊列,檢查是否還有 client 的寫出緩沖區中有殘留資料,
    // 如果有,那就為 client 注冊一個指令回複器 sendReplyToClient,等待用戶端寫就緒再繼續把資料回寫。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 檢查 client 的寫出緩沖區是否還有遺留資料。
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}
           

這裡的核心工作是。

  1. 檢查目前任務負載,如果目前任務數不足以在多線程模式下處理,請休眠 I/O 線程并将響應資料直接同步寫回用戶端。
  2. 喚醒處于休眠狀态的 I/O 線程(如果有)。
  3. 循環通路 client 隊列 clients_pending_write 并将所有任務配置設定給 I/O 線程和主線程,以通過 RR 政策将響應資料寫回用戶端。
  4. 忙于輪詢等待所有 I/O 線程完成其任務。
  5. 最後周遊 clients_pending_write ,為那些仍有響應資料的 clients 注冊指令回複處理程式 sendReplyToClient ,并等待用戶端可寫,然後再繼續寫回事件循環中的剩餘響應資料。

I/O 線程主邏輯

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    // 設定 I/O 線程的 CPU 親和性,盡可能将 I/O 線程(以及主線程,不在這裡設定)綁定到使用者配置的
    // CPU 清單上。
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        // 忙輪詢,100w 次循環,等待主線程配置設定 I/O 任務。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 如果 100w 次忙輪詢之後如果還是沒有任務配置設定給它,則通過嘗試加鎖進入休眠,
        // 等待主線程配置設定任務之後調用 startThreadedIO 解鎖,喚醒 I/O 線程去執行。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));


        // 注意:主線程配置設定任務給 I/O 線程之時,
        // 會把任務加入每個線程的本地任務隊列 io_threads_list[id],
        // 但是當 I/O 線程開始執行任務之後,主線程就不會再去通路這些任務隊列,避免資料競争。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 如果目前是寫出操作,則把 client 的寫出緩沖區中的資料回寫到用戶端。
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
              // 如果目前是讀取操作,則socket 讀取用戶端的請求指令并解析第一條指令。
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 所有任務執行完之後把自己的計數器置 0,主線程通過累加所有 I/O 線程的計數器
        // 判斷是否所有 I/O 線程都已經完成工作。
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}
           
  • 啟動 I/O 線程後,它首先進入繁忙輪詢以确定原子計數器中的任務數。如果它不為零,則主線程已為其配置設定了一個任務并開始執行它,否則它會忙于輪詢一百萬次并等待。如果仍然為 0,則嘗試添加本地鎖,因為主線程在啟動所有 I/O 線程時已經提前鎖定了它們的本地鎖,是以 I/O 線程将休眠并等待主線程喚醒。
  • 主線程将嘗試在每個事件循環中調用 startThreadedIO ,以喚醒 I/O 線程以執行任務。如果收到用戶端請求指令,I/O 線程将被喚醒并開始工作,以執行讀取和分析指令或根據主線程設定的 io_threads_op 标志寫回響應資料的任務。收到主線程的通知後,I/O 線程會周遊自己的本地任務隊列 io_threads_list[id] ,取出一個 client 來執行任務。如果目前操作是寫入操作,請調用 writeToClient 将響應資料從 client->buf 或 client->reply 通過套接字寫回用戶端。如果目前操作是讀取操作,調用 readQueryFromClient 通過套接字讀取用戶端指令,存儲在 client->querybuf 中,然後調用 processInputBuffer 解析指令,隻會以第一個指令結束,然後完成而不執行它。執行所有任務後,将其自己的原子計數器設定為 0,以告知主線程它已完成其工作。
void processInputBuffer(client *c) {

...

    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 判斷 client 是否具有 CLIENT_PENDING_READ 辨別,如果是處于多線程 I/O 的模式下,
            // 那麼此前已經在 readQueryFromClient -> postponeClientRead 中為 client 打上該辨別,
            // 則立刻跳出循環結束,此時第一條指令已經解析完成,但是不執行指令。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            // 執行用戶端指令
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }

...
}
           
  • 首次啟動 I/O 線程時,應特别注意目前線程的 CPU 關聯性,即将目前線程綁定到使用者配置的 CPU。啟動主 Redis 伺服器線程(即 Redis 的核心網絡模型)時設定相同的 CPU 關聯。Redis 本身是一個對吞吐量和延遲極其敏感的系統,是以使用者需要 Redis 對 CPU 資源進行更精細的控制。這裡有兩個主要注意事項:CPU 緩存和 NUMA 體系結構。
  • 首先是CPU緩存(我們談論的是将一級緩存和二級緩存內建到CPU中的硬體架構)。想象一下這樣的場景:主 Redis 程序在 CPU-1 上運作,向用戶端提供資料,Redis 啟動一個子程序以實作資料持久性(BGSAVE 或 AOF)。系統排程後,子程序接管主程序的CPU-1,主程序排程在CPU-2上運作,導緻CPU-1緩存中的指令和資料被消除,CPU-2将指令和資料重新加載到自己的本地緩存中,浪費CPU資源,降低性能。
Redis 多線程網絡模型[譯]

是以,通過設定 CPU 關聯,Redis 可以将主程序/線程和子程序/線程隔離,将它們綁定到不同的核心,使它們不會互相幹擾,進而可以有效提高系統性能。

第二個考慮因素基于 NUMA 體系結構。在NUMA系統下,記憶體控制器晶片內建在處理器内部,形成CPU本地記憶體。對本地記憶體的通路僅通過記憶體通道而不是通過系統總線,大大降低了通路延遲,同時多個處理器通過QPI資料鍊路互連,跨NUMA節點的記憶體通路開銷遠高于本地記憶體通路。

Redis 多線程網絡模型[譯]

是以,Redis 還可以通過設定 CPU 關聯性來大幅提升性能,使主程序/線程盡可能在 NUMA 節點上的固定 CPU 上運作,使用更多的本地記憶體,而不是跨節點通路資料。 有關 NUMA 的更多資訊,請自行檢視。以後有時間我會單獨寫一篇關于它的文章。

閱讀過源代碼的讀者可能會想知道的最後一點是,Redis 似乎并沒有在多線程模式下鎖定資料。事實上,Redis 的多線程模型自始至終都是無鎖的,這是通過原子操作 + 交錯通路實作的,主線程和 I/O 線程之間共享三個變量: io_threads_pending 計數器、 io_threads_op I/O 辨別符和 io_threads_list list 的線程本地任務隊列。

io_threads_pending 是一個不需要鎖保護的原子變量,而 io_threads_op 和 io_threads_list 是兩個變量,它們通過控制主線程和 I/O 線程之間的交錯通路來規避共享資料競争問題:I/O 線程通過繁忙的輪詢和鎖定休眠啟動并等待來自主線程的信号。啟動後,I/O 線程通過繁忙的輪詢和鎖定睡眠等待來自主線程的信号。 在喚醒 I/O 線程開始工作之前,它不會通路自己的本地任務隊列 io_threads_list[id] ,直到它将所有任務配置設定給每個 I/O 線程的本地隊列,并且主線程隻會通路自己的本地任務隊列 io_threads_list[ 0] ,不會通路 I/O 線程的本地隊列, 這可確定主線程始終在 I/O 線程之前通路 io_threads_list ,并且永遠不會再次通路,進而確定交錯通路。同樣,對于 io_threads_op ,主線程在喚醒 I/O 線程之前設定 io_threads_op 的值,并且在 I/O 線程運作時不會再次通路此變量。

Redis 多線程網絡模型[譯]

性能改進

Redis将核心網絡模型轉換為多線程模式,以追求最終的性能提升,是以基準資料是真實可靠的。

Redis 多線程網絡模型[譯]

測試資料表明,使用多線程模式時,Redis的性能可以顯著提高兩倍。更詳細的性能分析資料可以在這篇文章中找到:基于實驗的 Redis 多線程 I/O 基準測試。

以下是 Mito 技術團隊測算的新舊 Redis 版本性能對比,僅供參考。

Redis 多線程網絡模型[譯]

模型缺陷

首先,正如我前面提到的,Redis 的多線程網絡模型實際上并不是标準的多反應器/Master-Worker 模型,并且與其他主流開源 Web 伺服器模型不同。勞工模型,子反應堆/勞工将完成 network read -> data parsing -> command execution -> network write 流程,主反應堆/主反應堆隻負責配置設定任務。在 Redis 的多線程場景中,I/O 線程的任務隻是通過套接字讀取和分析用戶端請求,而不是實際執行指令。所有用戶端指令最終都需要傳回給主線程執行,是以多核的使用率不是很高。此外,每次主線程都必須忙于輪詢所有 I/O 線程以完成其任務,然後再繼續執行邏輯的其餘部分。

我認為 Redis 設計多線程網絡模型的主要原因是為了保持相容性,因為 Redis 以前是單線程的,所有用戶端指令都在單線程事件循環中執行,是以 Redis 中的所有資料結構都是非線程安全的。現在有了多線程,如果我們遵循标準的多反應器/主工作線程模型,所有内置的資料結構都必須重構才能實作線程安全,這是大量的工作和麻煩。

是以,在我看來,Redis 目前的多線程解決方案更像是一種折衷:它保持了原始系統的相容性,同時利用多個核心來提高 I/O 性能。

其次,目前 Redis 的多線程模型在主線程和 I/O 線程之間的通信過于簡單和殘酷:忙于輪詢和鎖定,因為通過旋轉繁忙輪詢等待會導緻 Redis 偶爾出現高占用率,導緻啟動時和運作時短暫的 CPU 空閑。這種通信機制的最終實作看起來非常不直覺和簡單,我們希望 Redis 稍後會在目前解決方案的基礎上進行改進。

總結

Redis 是緩存系統的事實标準,其基本原則值得深入研究。但是作者antirez,一個簡單的開發者,對向Redis添加任何新功能都非常謹慎,是以核心Redis網絡模型最終在Redis最初釋出十年後轉換為多線程模型,在此期間,許多Redis多線程替代方案甚至誕生了。盡管Antirez一直在推遲多線程解決方案,但它從未停止思考多線程的可行性。Redis多線程網絡模型的轉型并非一蹴而就,涉及項目的方方面面,是以我們可以看到最終的Redis解決方案并不完美,沒有采用主流的多線程設計。

讓我們回顧一下 Redis 多線程網絡模型的設計。

  • 使用 I/O 線程的多線程網絡 I/O,其中 I/O 線程僅負責網絡 I/O 和指令解析,不執行用戶端指令。
  • 使用原子操作 + 交錯通路實作無鎖多線程模型。
  • 通過設定 CPU 關聯将主程序與其他子程序隔離開來,以便多線程網絡模型可以最大限度地提高性能。

通讀本文後,相信讀者應該能夠了解計算機領域涉及的各種技術,以實作良好的網絡系統:設計模式、網絡 I/O、并發程式設計、作業系統底層,甚至計算機硬體。它還需要對項目疊代和重構采取謹慎的方法,并對技術解決方案進行深入思考,而不僅僅是編寫好代碼的困難部分。

繼續閱讀