天天看點

netty源碼淺讀 - 記憶體管理

netty源碼淺讀 - 記憶體管理

    • ByteBuf
      • ByteBuf資料結構
      • ByteBuf繼承體系
    • ByteBufAllocator
      • ByteBufAllocator具體職能
      • ByteBufAllocator繼承體系
    • PoolArena
      • PoolArena資料結構
      • PoolArena繼承體系
    • netty記憶體可視化
      • PoolChunk
      • PoolSubpage
    • 具體配置設定記憶體流程分析
      • 分析入口
      • 具體分析
      • Chunk配置設定算法分析
      • Chunk配置設定代碼分析
    • 未完成的部分

netty記憶體管理架構借鑒的是JEMalloc配置設定算法。

ByteBuf作為netty記憶體管理對象的抽象,統一了記憶體管理相應的api。如:

read*()、write*()、get*()、set*(),以及markReaderIndex()、markWriterIndex()、resetReaderIndex(),resetWriterIndex()。

netty源碼淺讀 - 記憶體管理

簡言之,一般情況下,ByteBuf提供兩個指針對資料進行讀寫,直覺上從左到右依次是讀指針,寫指針;即0—readerIndex之間是已讀資料(可丢棄的資料),readerIndex—writerIndex之間是可讀資料,writerIndex—capacity之間是可寫資料(實際上是writerIndex到maxCapacity之間)。

這樣看來,read*()方法會向右移動readerIndex,具體移動幾個位元組,要看read何種資料類型,例如調用readByte(),readerIndex會右移一個位元組,并且讀取該位元組的内容并傳回;

同理,write*()方法會向右移動writerIndex;

get*(),set*()會直接針對記憶體數組索引進行操作;

markWriterIndex(),resetWriterIndex()操作的是markedWriterIndex指針,調用markWriterIndex()會将目前writerIndex指派給markedWriterIndex,調用resetWriterIndex()會将writerIndex的值還原為markedWriterIndex,這個兩個方法提供了我們多次寫入同一段記憶體的能力。

同樣,markReaderIndex(),resetReaderIndex()會涉及到一個特殊的指針markedReaderIndex,這兩個方法提供了我們多次讀取同一段記憶體的能力。

netty源碼淺讀 - 記憶體管理

ByteBuf有一個預設骨架實作,即AbstractByteBuf,實作了大部分的功能,留有具體_get*()、_set*()方法提供給子類實作不同的讀取存入記憶體方式。

由AbstractByteBuf派生的具體記憶體管理的類分為三類:

  1. 池化和非池化(Pooled和UnPooled)
  • 帶有Pooled字首的均為池化記憶體持有對象,即:配置設定記憶體時,會從記憶體池中取出适合的一塊記憶體。
  • 帶有UnPooled字首的均為非池化記憶體持有對象。
  1. Unsafe和非Unsafe
  • 記憶體管理類名稱帶有Unsafe的,持有的記憶體是使用jdk底層unsafe配置設定的記憶體
  1. 直接記憶體和堆記憶體(Direct和heap)
  • Direct修飾的記憶體管理類,作為記憶體持有對象時,可以認為持有的記憶體是堆外記憶體,使用時要注意記憶體洩漏問題,如使用完要對記憶體進行釋放;反之,Heap~持有的記憶體是受虛拟機控制的,也就是可以被GC。

整個ByteBuf體系是建立在這三個次元上的,比如:PooledUnsafeDirectByteBuf是池化的,Unsafe的以及Direct的結合。

ByteBufAllocator:記憶體配置設定器。

ByteBuf隻是持有資源,對外提供操作資源的接口,并不關心資源從哪裡來,以及自身何時被初始化等問題,那麼這個時候ByteBufAllocator就閃亮登場啦,ByteBufAllocator負責提供(具體提供方式根據子類的差別而略顯不同)具體記憶體資源以及對ByteBuf進行初始化。

netty源碼淺讀 - 記憶體管理
  1. buffer()/buffer(int initialCapacity)/buffer(int initialCapacity, int maxCapacity):配置設定一個ByteBuf,具體是direct還是heap的依賴具體的實作
  2. ioBuffer()/ioBuffer(int initialCapacity)、ioBuffer(int initialCapacity, int maxCapacity):配置設定一個适合IO的ByteBuf(direct)
  3. heapBuffer()/~/ :配置設定一個堆内的ByteBuf
  4. directBuffer()/~/:配置設定一個持有直接記憶體的ByteBuf

PS:initialCapacity表示初始化 容量,maxCapacity表示最大可用容量

netty源碼淺讀 - 記憶體管理

ByteBufAllocator的抽象實作AbstractByteBufAllocator将ByteBufAllocator相應的api全部實作,僅僅提供了newHeapBuffer(int initialCapacity, int maxCapacity)&newDirectBuffer(int initialCapacity, int maxCapacity)供不同的子類去實作。

  1. PooledByteBufAllocator:如名,池化記憶體配置設定器
  2. UnpooledByteBufAllocator:非池化記憶體配置設定器

PoolArena是池化記憶體的管理者,負責組織協調池化記憶體,ByteBufAllocator配置設定池化記憶體底層依賴PoolArena(ByteBufAllocator組合了PoolArena)。

PoolArena除了維護一些基本的記憶體配置設定相關的資料之外,還維護了六個PoolChunkList,以及兩個PoolSubpage。

資料結構如下圖:

netty源碼淺讀 - 記憶體管理

類中表示為:

netty源碼淺讀 - 記憶體管理

PoolChunkList之前構成雙向連結清單(qInit除外),并且每個PoolChunkList内部的PoolChunk之間也是雙向連結清單的結構。在實際配置設定記憶體活動中,PoolChunk根據内部使用率的不同會在q000~q100連結清單之間移動,這樣做更有利于記憶體的配置設定以及減少記憶體碎片化。

netty源碼淺讀 - 記憶體管理

主要實作為HeapArena和DirectArena

PoolChunk是netty向作業系統申請記憶體的基本機關,預設情況下PoolChunk持有16M記憶體。并且PoolChunk中會維護很多相關的屬性:

  • 所屬的PoolArena
  • 是否池化記憶體
  • 偏移位址(offset)
  • 所屬的PoolChunkList
  • 與目前chunk構成連結清單的前驅和後繼
  • PoolSubpage數組 — 子頁數組,配置設定更小的記憶體
  • memoryMap — 存放使用完全二叉樹表示記憶體的配置設定資訊,且在配置設定過程中一般是以page(8K)為機關。
  • depthMap — 存放節點在二叉樹中的深度資訊

在向netty申請記憶體過程中,如果要申請的記憶體小于8K,則會将Chunk中的葉子節點(page)繼續拆分為SubPage,進而适應小記憶體的配置設定。

同樣的PoolSubpage内部維護了自身相關的一些屬性:

  • 所屬PoolChunk
  • elemSize — 按照elemSize切分page,即每塊subPage的記憶體大小
  • 與目前PoolSubpage構成連結清單的前驅和後繼

netty内部PoolSubpage記憶體規格根據elemSize的大小可以分為Tiny&Small,配置設定記憶體少于512B則為tiny規格,記憶體大于512B少于8k的為small規格。

前面談到的是記憶體管理涉及到的點,要真正的了解netty的記憶體管理還需要将各個點連接配接起來去感受。

public class NettyRAM {


    public static void main(String[] args) {
        PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
        ByteBuf buffer = allocator.buffer(1024*8);
    }
}
           

以池化記憶體配置設定器舉例說明,使用PooledByteBufAllocator.DEFAULT來得到一個PooledByteBufAllocator。

netty源碼淺讀 - 記憶體管理

  1. 走到PooledByteBufAllocator的構造方法,如下:
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
      // 省略一堆我覺得不太重要的代碼
        if (nHeapArena > 0) {
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }
    
        if (nDirectArena > 0) {
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }
               
    通過PooledByteBufAllocator的構造方法可以看出,在構造PooledByteBufAllocator的時候,會做一些必要的事情:
    • 執行個體化threadCache,threadCache是一個PoolThreadLocalCache對象,PoolThreadLocalCache是netty内部FastThreadLocal的子類,類似于jdk的ThreadLocal;
    • 初始化PooledByteBufAllocator内部組合的PoolArena數組(heapArenas&heapArenas,預設8個)
    以初始化directArenas為例分析
  2. 初始化PoolArena

    在初始化PoolArena的時,會初始化PoolChunkList,并将PoolChunkList按指定順序排好。

    netty源碼淺讀 - 記憶體管理
    剛剛初始化的PoolArena是不持有實際記憶體的,即六個PoolChunkList均沒有PoolChunk,到此,PooledByteBufAllocator初始化完成。
  3. 執行配置設定記憶體邏輯
    ByteBuf buffer = allocator.buffer(1028*8);
               
  • 先經過AbstractByteBufAllocator處理
    netty源碼淺讀 - 記憶體管理
    netty源碼淺讀 - 記憶體管理
    • 具體構造ByteBuf通過AbstractByteBufAllocator的抽象方法newDirectBuffer延遲到子類(本例的PooledByteBufAllocator)去實作,無處不在的模闆方法模式~
      netty源碼淺讀 - 記憶體管理

      從threadCache中擷取到directArena(PoolArena$directArena),通過arena進行記憶體的配置設定。

      在我們調用threadCache.get()的時候,實則調用的是FastThreadLocal相應的方法:

      netty源碼淺讀 - 記憶體管理
      進行initialize方法
      netty源碼淺讀 - 記憶體管理
      執行子類複寫的initialValue
      netty源碼淺讀 - 記憶體管理
      首先從PooledByteBufAllocator維護的heapArenas和directArenas中找到目前支援的線程緩存數最少的PoolArena,然後通過得到的一個heapArena和directArena建構出一個PoolThreadCache對象,是以上邊我們可以從threadCache中擷取到directArena,這樣處理也可以有效的避免多線程申請記憶體時競争同一塊記憶體空間的情況。
  • 通過PoolArena(DirectArena的allocate)配置設定
    netty源碼淺讀 - 記憶體管理
    圈中的地方是得到PooledByteBuf的邏輯,内部使用了Recycler,不是重點,先不作分析。重點跟蹤PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf, int)
    • 具體allocate邏輯:
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    	//step 1 記憶體規則化
        final int normCapacity = normalizeCapacity(reqCapacity);
        //step 2 判斷申請記憶體是否小于一個page(8K)
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
           // step 3 判斷申請記憶體是否小于512B
           if (tiny) { // < 512
           		//緩存配置設定tinyPage
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
            	// 緩存配置設定smallPage
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
    
            final PoolSubpage<T> head = table[tableIdx];
    
            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }
    
            incTinySmallAllocation(tiny);
            return;
        }
        // 所需記憶體是否小于等于16M且大于等于一個page
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
               
    由于本例配置設定的記憶體為8k,是以normCapacity <= chunkSize為true,首先會從緩存中讀取(第一次不會從緩存讀取成功),失敗後會走allocateNormal(buf, reqCapacity, normCapacity)配置設定記憶體
    • allocateNormal如下
      netty源碼淺讀 - 記憶體管理
      先從PoolChunkList中尋找chunk進行配置設定(從q050開始),初次配置設定PoolChunkList中均無記憶體,是以走到新增chunk的邏輯。
      • newChunk會調用底層Unsafe申請記憶體
        netty源碼淺讀 - 記憶體管理
      • 申請Chunk之後,執行配置設定邏輯,即根據要申請記憶體的容量通過計算拿到chunk中memoryMap的索引傳回,如何計算,以及memoryMap的作用稍後分析。
        long handle = c.allocate(normCapacity);
                   
      • 然後通過申請的chunk初始化PooledByteBuf(即将chunk中的記憶體片段交由ByteBuf管理)
        c.initBuf(buf, handle, reqCapacity);
                   
      • 最後将新增的Chunk關聯到qInit(PoolChunkList),交由PoolChunkList鍊管理,傳回初始化的PooledByteBuf,流程結束。
        qInit.add(c);
                   

我們向作業系統申請的記憶體是以chunk為機關的(netty預設16M),而事實上,程式中使用記憶體申請時往往不會直接申請大于等于16M的記憶體(也有這種可能,相關邏輯在allocateHuge(buf, reqCapacity)),是以我們可以了解為具體的記憶體配置設定還需要将chunk切分然後配置設定。

具體如何進行配置設定以及如果配置設定到合适的記憶體,就不能随性而為了,因為要考慮到記憶體配置設定的連續性,要盡量避免記憶體碎片化。

netty對chunk的配置設定采用了夥伴配置設定算法(詳情請google):netty中Chunk中持有的連續記憶體是以完全二叉樹的葉子節點表示的,并且二叉樹節點狀态值交由數組維護。

其中memoryMap[]數組維護某一節點是否使用,可以通過下圖來了解

netty源碼淺讀 - 記憶體管理

圖檔來源

在記憶體配置設定過程中,如果要申請一個page(8K)的記憶體,經過運算會定位到樹的第十一層,然後擷取一個未被使用過的節點,将節點在數組(memoryMap[])中的下标傳回,即可。如果要擷取一個16K的記憶體,那麼可以定位到樹的第十層,然後擷取第十層的一個未被使用的節點,将節點在數組中的下标傳回,依次類推。

depthMap[]數組維護節點下标和樹深度的關系,該數組的值一經初始化是不會改變的,初始化時 depthMap和memoryMap的長度以及相同下标的值都是相同的,數組的長度是2

12(4096),相應下标對應的值是目前節點的深度,在depthMap中,如果該節點被配置設定,那麼目前節點對應數組下标的值會被變更為unusable(12),然後目前被配置設定節點的父節點序号作為下标對應數組中的值會變更為兩個子節點中最小的值,遞歸操作,直到根節點。

long handle = c.allocate(normCapacity);
           
  • 進入到PoolChunk的allocate()
    netty源碼淺讀 - 記憶體管理
    目前例子要配置設定的記憶體為8K(一個page),是以會走allocateRun進行配置設定
    • allocateRun負責配置設定大于等于一個page的記憶體
      netty源碼淺讀 - 記憶體管理

      首先根據所需記憶體通過計算公式計算出目前記憶體值對應二叉樹的層級(比如之前談到的,所需記憶體如果是8K的話,會在第十一層【0層為根節點】選取一個節點進行配置設定,因為第十一層節點記憶體均按照8K進行劃分),具體計算方式可以這麼了解:pageShifts預設會被初始化為13,因為樹上的一個葉子節點表示8K,213 = 8192 (8K);log2(normCapacity)中,normCapacity是需要配置設定記憶體的byte值,如果我們需要配置設定8K的記憶體,那麼normCapacity就為8192,即213,那麼有log2213=13;是以當需要配置設定8192位元組時,(log2(normCapacity) - pageShifts) = 0 ;maxOrder為樹的深度,maxOrder - 0 = maxOrder = 11,即當需要配置設定8192位元組時會在第十一層(葉子節點)配置設定一個節點;當我們需要配置設定16K即214時,有log2(normCapacity) = 14,即(log2(normCapacity) - pageShifts) = 1,那麼有maxOrder - 1 = 10,是以得出配置設定16K的記憶體需要再第十層節點進行配置設定,依次類推,自己體會~

      計算出需要配置設定樹的層級之後,最進行最關鍵的一步,即在目前層級上找到可以配置設定的節點!

      • allocateNode(d)配置設定d層節點
        netty源碼淺讀 - 記憶體管理
      1. 首先id指派為1,判斷目前chunk是否可用,value(id)是擷取memoryMap[id]的值,即擷取根節點的值,其中val > d (本例val為0,即chunk未被配置設定),如果成立,那麼說明目前chunk已無可以配置設定目前所需記憶體的節點。
      2. while循環内部從根節點遞歸查找可用節點

        id <<= 1:id左移一位,數值上的含義是id * 21,映射到完全二叉樹中的含義是取得目前節點的左孩子節點

        val = value(id):取得節點儲存的值

        if (val > d):如果目前節點儲存的值大于所需配置設定節點的深度,可知目前節點不具備配置設定的能力

        id ^= 1:id與1做異或運算,數值上的含義是偶數加一,奇數減一,映射到完全二叉樹中就表示為擷取目前節點的兄弟節點。

        while循環内部尋找配置設定節點的過程可以概括為:從根節點開始往下尋找二叉樹中适合的節點

      3. 将配置設定的節點設值為unusable(setValue(id, unusable) )
      4. 更新父節點的狀态(updateParentsAlloc(id))
        netty源碼淺讀 - 記憶體管理
        id無符号右移一位的數學含義是id/21,反映到完全二叉樹中則是,尋找目前節點的父親節點。 然後拿到目前節點以及目前節點兄弟節點的值,作比較,将最小的指派給父親節點。依次,直到更新到根節點。end~

繼續閱讀