天天看點

Netty 系列三(ByteBuf).

一、概述和原理

    網絡資料傳輸的基本機關總是位元組,Netty 提供了 ByteBuf 作為它的位元組容器,既解決了 JDK API 的局限性,又為網絡應用程式提供了更好的 API,ByteBuf 的優點:

1、可以被使用者自定義的緩沖區類型擴充

2、通過内置的複合緩沖區類型實作了透明的零拷貝

3、容量可以按需增長

4、在讀和寫這兩種模式之間切換不需要調用 ByteBuffer 的 flip()方法

5、讀和寫使用了不同的索引

6、支援方法的鍊式調用

7、支援引用計數

8、支援池化

    ByteBuf通過兩個索引(readerIndex、writerIndex)劃分為三個區域:

Netty 系列三(ByteBuf).

1、任何名稱以 read 或者 skip 開頭的操作都将檢索或者跳過位于目前readerIndex 的資料,并且将它增加已讀位元組數;任何名稱以 write 開頭的操作都将從目前的 writerIndex 處開始寫資料,并将它增加已經寫入的位元組數。readerIndex 不能超過 writerIndex。

2、如果被調用的方法需要一個 ByteBuf 參數作為讀取的目标,并且沒有指定目标索引參數,那麼該目标緩沖區的 writerIndex 也将被增加;如果寫操作的目标也是 ByteBuf,并且沒有指定源索引的值,則源緩沖區的 readerIndex 也同樣會被增加相同的大小。

3、如果嘗試在緩沖區的可讀位元組數已經耗盡時從中讀取資料, 那麼将會引發一個IndexOutOfBoundsException;如果嘗試往目标寫入超過目标容量的資料,将檢查目前的寫索引以及最大容量是否可以在擴充後容納該資料,可以的話就會配置設定并調整容量,否則将會引發一個IndexOutOfBoundException。

4、通過調用 discardReadBytes()方法, 可以丢棄已讀位元組并回收空間。但不建議頻繁的調用discardReadBytes(),因為将可能導緻記憶體複制:

Netty 系列三(ByteBuf).

5、通過調用 clear()方法來将 readerIndex 和 writerIndex 都設定為 0。 注意,調用 clear()比調用 discardReadBytes()輕量得多, 因為它将隻是重置索引而不會複制任何的記憶體。 

Netty 系列三(ByteBuf).

二、ByteBuf 配置設定

    ByteBuf 是一個抽象類,在程式中直接new ByteBuf() 是不現實的啦!畢竟還要實作一堆的方法,并且缺乏池化的管理。那麼,在 Netty 中要怎麼得到 ByteBuf 呢?

    有兩種方法可以得到 ByteBuf 執行個體,一種是 ByteBufAllocator (實作了池化,有效的降低了配置設定和釋放記憶體的開銷),另一種是 Unpooled (Netty 提供的工具類來建立未池化的ByteBuf 執行個體)。

    ByteBufAllocator:Netty 提供了兩種 ByteBufAllocator 的實作,PooledByteBufAllocator(預設使用)和UnpooledByteBufAllocator。前者池化了ByteBuf的執行個體以提高性能并最大限度地減少記憶體碎片;後者的實作不池化ByteBuf執行個體, 并且在每次它被調用時都會傳回一個新的執行個體。

    建立 ButeBuf 執行個體的代碼如下:

public void createBuf() {
    //獲得ByteBufAllocator 的兩種方式
    //1、
    Channel channel = null;
    ByteBufAllocator alloc = channel.alloc();
    //2、
    ChannelHandlerContext channelHandlerContext = null;
    ByteBufAllocator alloc1 = channelHandlerContext.alloc();
    
    //ByteBufAllocator 建立 ByteBuf 執行個體
    //1、傳回一個基于堆或者直接記憶體存儲的ByteBuf
    ByteBuf byteBuf = alloc.buffer(256, Integer.MAX_VALUE);
    //2、傳回一個基于堆記憶體存儲的 ByteBuf 執行個體
    ByteBuf byteBuf1 = alloc.heapBuffer(256);

    byteBuf1.refCnt();//檢查該ByteBuf的引用計數
    byteBuf1.release();//将ByteBuf的引用計數設為0并釋放

    //3、傳回一個基于直接記憶體存儲的 ByteBuf
    ByteBuf byteBuf2 = alloc.directBuffer();
    //4、傳回一個可以通過添加最大到指定數目的基于堆的或者直接記憶體存儲的緩沖區來擴充的 CompositeByteBuf
    CompositeByteBuf compositeByteBuf = alloc.compositeBuffer();
    CompositeByteBuf compositeByteBuf1 = alloc.compositeHeapBuffer(16);
    CompositeByteBuf compositeByteBuf2 = alloc.compositeDirectBuffer(16);
    //5、傳回一個用于套接字的 I/O 操作的ByteBuf
    ByteBuf byteBuf3 = alloc.ioBuffer();
    
 
    //Unpooled 建立 ByteBuf 執行個體
    //1、建立一個未池化的基于堆記憶體存儲的 ByteBuf 執行個體
    ByteBuf buf = Unpooled.buffer();
    //2、建立一個未池化的基于記憶體存儲的ByteBuf
    ByteBuf buf1 = Unpooled.directBuffer(256, Integer.MAX_VALUE);
    //3、傳回一個包裝了給定資料的 ByteBuf
    Unpooled.wrappedBuffer("Hello Netty".getBytes());
    //4、傳回一個複制了給定資料的 ByteBuf
    Unpooled.copiedBuffer("Hello Netty",CharsetUtil.UTF_8);
}      

三、ByteBuf 操作

    涉及到 ByteBuf 的操作主要是兩個方面,一個是 ByteBuf 的讀/寫,另一個是 ByteBuf 的複制分片操作。

    ByteBuf 的讀/寫操作操作有兩種類别:get() 和 set() 操作,從給定的索引開始,并且保持索引不變,也就是說get() 和 set() 操作并不會改變 readerIndex 和 writerIndex 的值;read()和 write()操作, 從給定的索引開始,并且會根據已經通路過的位元組數對索引進行調整,比如 read() 操作 readerIndex 會根據讀取的資料類型(byte 1個位元組,short 2個位元組,int 4個位元組,long 8個位元組)增加對應的索引數。

    ByteBuf 提供了專門的方式來實作複制分片操作:

duplicate();

slice();

slice(int, int);

Unpooled.unmodifiableBuffer(…);

order(ByteOrder);

readSlice(int)。

    這些方法都将傳回一個新的 ByteBuf 執行個體,它具有自己的讀索引、寫索引和标記索引。其内部存儲和 JDK 的 ByteBuffer 一樣也是共享的。這意味着,如果你修改了它的内容,也同時修改了其對應ByteBuf的源執行個體。

    如果需要一個現有緩沖區的真實副本,請使用 copy()或者 copy(int, int)方法。不同于派生緩沖區,由這個調用所傳回的 ByteBuf 擁有獨立的資料副本。

    現在我們具體來看看 ByteBuf 的操作:

    1、資料周遊

for (int i = 0; i < byteBuf.capacity(); i++) {
    byte aByte = byteBuf.getByte(i);
    System.out.print((char) aByte);
}

while (byteBuf.isReadable()){
    System.out.print((char) byteBuf.readByte());
}      

    2、寫入資料

while (byteBuf.writableBytes() >= 4){
    byteBuf.writeByte(65);
}      

    3、索引标記管理

ByteBuf buf = byteBuf.readerIndex(0);//将 readerIndex 移動到指定的位置
buf.markReaderIndex();//标記目前的 readerIndex
while (buf.isReadable()){
    System.out.print((char) buf.readByte());
}
buf.resetReaderIndex();//回退到之前标記的 readerIndex
while (buf.isReadable()){
    System.out.print((char) buf.readByte());
}

int index = byteBuf.indexOf(0, byteBuf.capacity() - 1, (byte) 65);//在某個範圍内查找某個位元組的索引      

    4、分片操作

ByteBuf slice = byteBuf.slice(0, 15);
System.out.print(slice.toString(CharsetUtil.UTF_8));
//更新索引0處的位元組
slice.setByte(0, (byte) 'J');
byte aByte = byteBuf.getByte(0);
System.out.print("\r\n" + (char)aByte);      

    5、複制操作

ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(CharsetUtil.UTF_8));
copy.setByte(0, (byte) 'A');
System.out.println((char) byteBuf.getByte(0));      

    6、其他操作

System.out.println("如果至少有一個位元組可讀取:" + byteBuf.isReadable());
System.out.println("如果至少有一個位元組可寫入:" + byteBuf.isWritable());
System.out.println("傳回可被讀取的位元組數:" + byteBuf.readableBytes());
System.out.println("傳回可被寫入的位元組數:" + byteBuf.writableBytes());
System.out.println("可容納的位元組數:" + byteBuf.capacity() + ",可擴充最大的位元組數:" + byteBuf.maxCapacity());
System.out.println("是否由一個位元組數組支撐:" + byteBuf.hasArray());
System.out.println("由一個位元組數組支撐則傳回該數組:" + byteBuf.array().length);
System.out.println("計算第一個位元組的偏移量:" + byteBuf.arrayOffset());
System.out.println("傳回Bytebuf的十六進制:" + ByteBufUtil.hexDump(byteBuf.array()));      

四、補充

ByteBuf 的使用模式 :

    1、堆緩沖區:最常用的 ByteBuf 模式是将資料存儲在 JVM 的堆空間中。 這種模式被稱為支撐數組(backing array), 它能在沒有使用池化的情況下提供快速的配置設定和釋放。

    2、直接緩沖區:将資料駐留在會被垃圾回收的堆之外,直接緩沖區對于網絡資料傳輸是最理想的選擇,不過,相對于基于堆的緩沖區,它們的配置設定和釋放都較為昂貴。另外,如果你的資料包含在一個在堆上配置設定的緩沖區中, 那麼事實上,在通過套接字發送它之前, JVM将會在内部把你的緩沖區複制到一個直接緩沖區中。經驗表明,Bytebuf的最佳實踐是在IO通信線程的讀寫緩沖區使用DirectByteBuf,後端業務使用HeapByteBuf。

    3、複合緩沖區:為多個 ByteBuf 提供一個聚合視圖。 在這裡你可以根據需要添加或者删除 ByteBuf 執行個體。

                               Netty 通過一個 ByteBuf 子類——CompositeByteBuf——實作了這個模式, 它提供了一個将多個緩沖區表示為單個合并緩沖區的虛拟表示。

                               使用 CompositeByteBuf 的複合緩沖區模式:

public void CompositeBuffer() {
    CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
    ByteBuf headBuf = Unpooled.copiedBuffer("Hello,", CharsetUtil.UTF_8);
    ByteBuf bodyBuf = Unpooled.copiedBuffer("Netty!", CharsetUtil.UTF_8);
    //将 ByteBuf 執行個體追加到 CompositeByteBuf
    messageBuf.addComponents(headBuf, bodyBuf);
    Iterator<ByteBuf> it = messageBuf.iterator();
    //通路CompositeByteBuf資料
    while(it.hasNext()){
        ByteBuf buf = it.next();
        while (buf.isReadable()){
            System.out.print((char) buf.readByte());
        }
    }
    //使用數組通路資料
    if(!messageBuf.hasArray()){
        int len = messageBuf.readableBytes();
        byte[] arr = new byte[len];
        messageBuf.getBytes(0, arr);
        for (byte b : arr){
            System.out.print((char)b);
        }
    }
    messageBuf.removeComponent(0); //删除位于索引位置為 0(第一個元件)的 ByteBuf
}      

參考資料:《Netty IN ACTION》

示範源代碼:https://github.com/JMCuixy/NettyDemo/blob/master/src/main/java/org/netty/demo/bytebuf/ByteBufOperation.java