前言
网络数据的基本单位永远是 byte(字节)。Java NIO 提供 ByteBuffer 作为字节的容器,但这个类是过于复杂,有点难以使用,切换读写状态需要flip()等。
Netty 中 ByteBuffer 的替代是 ByteBuf,一个强大的实现,解决 JDK 的 API 的限制,以及为网络应用程序开发者一个更好的工具。 但 ByteBuf 并不仅仅暴露操作一个字节序列的方法;这也是专门的 Netty 的 ChannelPipeline 的语义设计。
在本文中,我们会说明相比于 JDK 的 API,ByteBuf 所提供的卓越的功能和灵活性。这也将使我们能够更好地理解了 Netty 的数据处理。
1、Buffer API
主要包括
- ByteBuf
- ByteBufHolder
Netty 使用 reference-counting (引用计数) 来判断何时可以释放 ByteBuf 或 ByteBufHolder 和其他相关资源,从而可以利用池和其他技巧来提高性能和降低内存的消耗。这一点上不需要开发人员做任何事情,但是在开发 Netty 应用程序时,尤其是使用 ByteBuf 和 ByteBufHolder 时,你应该尽可能早地释放池资源。
引用计数:学习过 JVM 的小伙伴应该知道垃圾回收有引用计数法和可达性分析这两种算法判断对象是否存活,Netty 就使用了 引用计数法来优化内存的使用。引用计数确保了当对象的引用计数大于 1 时,对象就不会被释放,当计数减少至 0 时, 对象就会被释放,如果程序访问一个已被释放的引用计数对象,那么将会导致一个 IllegalReferenceCountException 异常。 在 Netty 中,ByteBuf 和 ByteBufHolder 都实现了 ReferenceCounted 接口。
1.1 ByteBuf特点概览
- 用户可以自定义缓冲区类型对其扩展
- 通过内置的符合缓冲区类型实现了透明的零拷贝
- 容量可以按需增长(类似
)StringBuilder
- 切换读写模式不用调用
方法flip()
- 读写使用各自的索引
- 支持方法的链式调用
- 支持引用计数
- 支持池化
2、ByteBuf类介绍
2.1工作模式
ByteBuf
维护了两个指针,一个用于读取(
readerIndex
),一个用于写入(
writerIndex
).
使用ByteBuf的API中的
read*
方法读取数据时,
readerIndex
会根据读取字节数向后移动,但是
get*
方法不会移动
readerIndex
;使用
write*
数据时,
writerIndex
会根据字节数移动,但是
set*
方法不会移动
writerIndex
.(
read*
表示
read
开头的方法,其余意义相同)
读取数据时,如果
readerIndex
超过了
writerIndex
会触发
IndexOutOfBoundsException
.
可以指定
ByteBuf
容量最大值,
capacity(int)
或
ensureWritable(int)
,当超出容量时会抛出异常.
2.2 使用模式
2.2.1 HEAP BUFFER (堆缓冲区)
将ByteBuf存入JVM的堆空间,堆缓冲区可以在没有使用池化的情况下快速分配和释放,非常适合用来处理遗留数据的。
除此之外,ByteBuf的堆缓冲区还提供了一个后备数组(backing array),后备数组和ByteBuf中的数据是对应的,如果修改了backing array中的数据,ByteBuf中的数据是同步的。
它还提供了直接访问数组的方法,通过 ByteBuf.array() 来获取 byte[] 数据。
public static void main(String[] args) {
ByteBuf heapBuf = Unpooled.buffer(1024);
if(heapBuf.hasArray()){
heapBuf.writeBytes("Hello,heapBuf".getBytes());
System.out.println("数组第一个字节在缓冲区中的偏移量:"+heapBuf.arrayOffset());
System.out.println("缓冲区中的readerIndex:"+heapBuf.readerIndex());
System.out.println("writerIndex:"+heapBuf.writerIndex());
System.out.println("缓冲区中的可读字节数:"+heapBuf.readableBytes());//等于writerIndex-readerIndex
byte[] array = heapBuf.array();
for(int i = 0;i < heapBuf.readableBytes();i++){
System.out.print((char) array[i]);
if(i==5){
array[i] = (int)'.';
}
}
//不会修改readerIndex位置
System.out.println("\n读取数据后的readerIndex:"+heapBuf.readerIndex());
//读取缓冲区的数据,查看是否将逗号改成了句号
while (heapBuf.isReadable()){
System.out.print((char) heapBuf.readByte());
}
}
}
输出:
数组第一个字节在缓冲区中的偏移量:0
缓冲区中的readerIndex:0
writerIndex:13
缓冲区中的可读字节数:13
Hello,heapBuf
读取数据后的readerIndex:0
Hello.heapBuf
如果hasArray()返回false,尝试访问backing array会报错
2.2.2 DIRECT BUFFER (直接缓冲区)
在 Java 中,我们创建的对象大部分都是存储在堆区之中的,但这不是绝对的。
在 NIO 的 API 中, 允许 Buffer 分配直接内存,即操作系统的内存。
直接缓冲区存储于JVM堆外的内存空间,这样做有一个好处,当你想把JVM中的数据写给socket,需要将数据复制到直接缓冲区(JVM堆外内存)再交给socket,如果使用直接缓冲区,将减少复制这一过程。
但是直接缓冲区也是有不足的,与JVM堆的缓冲区相比,他们的分配和释放是比较昂贵的。而且还有一个缺点,面对遗留代码的时候,可能不确定ByteBuf使用的是直接缓冲区还是堆缓冲区,你可能需要进行一次额外的复制,与自带后备数组的堆缓冲区来讲,这要多做一些工作。所以,如果确定容器中的数据会被作为数组来访问,你可能更愿意使用堆内存。
//实际上你不知道从哪获得的引用,这可能是一个直接缓冲区的ByteBuf
//忽略Unpooled.buffer方法,当做不知道从哪获得的directBuf
ByteBuf directBuf = Unpooled.buffer(1024);
//如果想要从数组中访问数据,需要将直接缓冲区中的数据手动复制到数组中
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
directBuf.getBytes(directBuf.readerIndex(), array);
handleArray(array, 0, length);
}
2.2.3 COMPOSITE BUFFER (复合缓冲区)
复合缓冲区,我们可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf 组合的视图 CompositeByteBuf。我们可以动态的向 CompositeByteBuf 中添加和删除其中的 ByteBuf 实例,JDK 的 ByteBuffer 没有这样的功能。
警告:CompositeByteBuf.hasArray() 总是返回 false,因为它可能既包含堆缓冲区,也包含直接缓冲区。
聚合缓冲区是个非常好用的东西,是多个ByteBuf的聚合视图,可以添加或删除ByteBuf实例。
CompositeByteBuf中的ByteBuf实例可能同事包含直接内存分配和非直接内存分配。如果其中只有一个实例,那么调用CompositeByteBuf中的hasArray()方法将返回该组件上的hasArray()方法的值,否则返回false
多个ByteBuf组成一个完整的消息是很常见的,比如header和body组成的HTTP协议传输的消息。消息中的body有时候可能能重用,我们不想每次都创建重复的body,我们可以通过CompositeByteBuf来复用body。
对比一下JDK中的ByteBuffer实现复合缓冲区和Netty中的CompositeByteBuf.
//JDK版本实现复合缓冲区
public static void byteBufferComposite(ByteBuffer header, ByteBuffer body) {
//使用一个数组来保存消息的各个部分
ByteBuffer[] message = new ByteBuffer[]{ header, body };
// 创建一个新的ByteBuffer来复制合并header和body
ByteBuffer message2 =
ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();
}
//Netty中的CompositeByteBuf
public static void byteBufComposite() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer(1024); // 可能是直接缓存也可能是堆缓存中的
ByteBuf bodyBuf = Unpooled.buffer(1024); // 可能是直接缓存也可能是堆缓存中的
messageBuf.addComponents(headerBuf, bodyBuf);
//...
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
}
CompositeByteBuf不支持访问其后备数组,所以访问CompositeByteBuf中的数据类似于访问直接缓冲区
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();
byte[] array = new byte[length];
//将CompositeByteBuf中的数据复制到数组中
compBuf.getBytes(compBuf.readerIndex(), array);
//处理一下数组中的数据
handleArray(array, 0, array.length);
Netty使用CompositeByteBuf来优化socket的IO操作,避免了JDK缓冲区实现所导致的性能和内存使用率的缺陷.内存使用率的缺陷是指对可复用对象大量的复制,Netty对其在内部做了优化,虽然没有暴露出来,但是应该知道CompositeByteBuf的优势和JDK自带工具的弊端.
JDK的NIO包中提供了Scatter/Gather I/O技术,字面意思是打散和聚合,可以理解为把单个ByteBuffer切分成多个或者把多个ByteBuffer合并成一个.
3、字节级操作
ByteBuf的索引从0开始,最后一个索引是
capacity()-1
.
遍历演示:
ByteBuf buffer = Unpooled.buffer(1024);
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);//这种方法不会移动readerIndex指针
System.out.println((char) b);
}
3.1 readerIndex和writerIndex
JDK中的
ByteBuffer
只有一个索引,需要通过
flip()
来切换读写操作,Netty中的
ByteBuf
既有读索引,也有写索引,通过两个索引把ByteBuf划分了三部分.
可以调用discardReadBytes()方法可丢弃可丢弃字节并回收空间.
调用discardReadBytes()方法之后
使用read或skip方法都会增加readerIndex.
移动readerIndex读取可读数据的方式
ByteBuf buffer = ...;
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
write*方法写入ByteBuf时会增加writerIndex,如果超过容量会抛出IndexOutOfBoundException.
writeableBytes()可以返回可写字节数.
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
3.2 索引管理
JDK 的定义了
InputStream
和
mark(int readlimit)
reset()
方法,这些方法分别被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。
同样,可以通过调用
、
markReaderIndex()
、
markWriterIndex()
和
resetWriterIndex()
来标记和重置
resetReaderIndex()
的
ByteBuf
和
readerIndex
。这些和
writerIndex
上的调用类似,只是没有
InputStream
参数来指定标记什么时候失效。
readlimit
如果将索引设置到一个无效位置会抛出
IndexOutOfBoundsException
.
可以通过
clear()
归零索引,归零索引不会清除数据.
3.3 查找
ByteBuf中很多方法可以确定值的索引,如
indexOf()
.
复杂查找可以通过那些需要一个
ByteBufProcessor
作为参数的方法完成.这个接口应该可以使用
lambda
表达式(但是我现在使用的Netty4.1.12已经废弃了该接口,应该使用
ByteProcessor
).
ByteBuf buffer = ...;
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
3.4 派生缓冲区
派生缓冲区就是,基于原缓冲区一顿操作生成新缓冲区.比如复制,切分等等.
duplicate()
;
slice()
;
slice(int, int)
;
Unpooled.unmodifiableBuffer(…)
;
order(ByteOrder)
;
readSlice(int)
.
每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。 其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心
//复制
public static void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) != copy.getByte(0);
}
//切片
public static void byteBufSlice() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == sliced.getByte(0);
}
还有一些读写操作的API,留在文末展示吧.
4、ByteBufHolder接口
从表面理解起来,ByteBufHolder是ByteBuf的持有者,的确没有错。
我们经常发现, 除了实际的数据负载之外, 我们还需要存储各种属性值。 HTTP 响应便是一个很好的例子, 除了表示为字节的内容,还包括状态码、 cookie 等。
为了处理这种常见的用例, Netty 提供了 ByteBufHolder。 ByteBufHolder 也为 Netty 的高级特性提供了支持,如缓冲区池化,其中可以从池中借用 ByteBuf, 并且在需要时自动释放。ByteBufHolder 只有几种用于访问底层数据和引用计数的方法。
5、ByteBuf的分配
前面介绍了ByteBuf的一些基本操作和原理,但却并未说明如何分配一个ByteBuf,这里将讲解ByteBuf的分配方式。
5.1 ByteBufAllocator
为了减少分配和释放内存的开销,Netty通过 ByteBufAllocator 实现了ByteBuf的池化。以下是ByteBufAllocator 的常见方法。
- buffer: 返回一个基于堆或直接内存的ByteBuf,具体取决于实现。
- heapBuffer: 返回一个基于堆内存的ByteBuf。
- directBuffer: 返回一个基于直接内存的ByteBuf。
- compositeBuffer: 返回一个组合ByteBuf。
- ioBuffer: 返回一个用于套接字的ByteBuf。
我们可以通过
ByteBufAllocator
来分配一个
ByteBuf
实例.
ByteBufAllocator
接口实现了ByteBuf的池化。
可以通过
Channel
(每个都可以有一个不同的
ByteBufAllocator
实例)或者绑定到
ChannelHandler
的
ChannelHandlerContext
获取一个到
ByteBufAllocator
的引用。
//从Channel获取一个ByteBufAllocator的引用
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//从ChannelHandlerContext获取ByteBufAllocator 的引用
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
Netty提供了两种ByteBufAllocator的实现: PooledByteBufAllocator和UnpooledByteBufAllocator。PooledByteBufAllocator池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片,此实现的分配内存的方法 是使用 jemalloc,此种 方法分配内存的效率非常高,已被大量现代操作系统采用。 UnpooledByteBufAllocator的实现不 池化ByteBuf实例, 并且在每次它被调用时都会返回一个新的实例。Netty默认使用的是 PooledByteBufAllocator
。
5.2 Unpooled缓冲区
可能有时候拿不到ByteBufAllocator引用的话,可以使用Unpooled工具类来创建未持化ByteBuf实例.
5.3 ByteBufUtil类
ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的, 并且和池化无关,所以这些方法已然在分配类的外部实现。
这些静态方法中最有价值的可能就是 hexdump()方法, 它以十六进制的表示形式打印ByteBuf 的内容。这在各种情况下都很有用,例如, 出于调试的目的记录 ByteBuf 的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。
另一个有用的方法是 boolean equals(ByteBuf, ByteBuf), 它被用来判断两个 ByteBuf实例的相等性。如果你实现自己的 ByteBuf 子类,你可能会发现 ByteBufUtil 的其他有用方法。
6、引用计数
引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。 它们都实现了 interface ReferenceCounted。 引用计数背后的想法并不是特别的复杂;它主要涉及跟踪到某个特定对象的活动引用的数量。一个 ReferenceCounted 实现的实例将通常以活动的引用计数为 1 作为开始。只要引用计数大于 0, 就能保证对象不会被释放。当活动引用的数量减少到 0 时,该实例就会被释放。注意,虽然释放的确切语义可能是特定于实现的,但是至少已经释放的对象应该不可再用了。
//从Channel获取ByteBufAllocator
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//从ByteBufAllocator分配一个ByteBuf
ByteBuf buffer = allocator.directBuffer();
assert buffer.refCnt() == 1;//引用计数是否为1
7、扩容
扩容是指在ByteBuf在写入数据时,已经超过容量但没有超过最大容量
在容量小于512时,会选择16的整数倍进行扩容超过512时则会选择2^n扩容
比如容量为12,写入数据在13的位置,容量会扩容到16,写出位置在17容量会扩容到32
写入位置在514容量会扩容到1024,写入位置在1026,容量会扩容到2048.......以此类推
8、内存释放
ByteBuf的内存释放是通过ReferenceCounted来实现的(采用的是引用计数的算法来回收内存)
io.netty.util.ReferenceCounted
package io.netty.util;
public interface ReferenceCounted {
/**
* Increases the reference count by {@code 1}.
* 此方法是将bytebuf的引用加1
*/
ReferenceCounted retain();
/**
* Decreases the reference count by {@code 1} and
deallocates this object if the reference count reachesat
* {@code 0}.
* 此方法是将bytebuf的引用-1
* @return {@code true} if and only if the reference count
became {@code 0} and this object has been deallocated
*/
boolean release();
}
当bytebuf的引用计数为0时就会回收调用改bytebuf(不同会有不同的回收算法池化的会入池,非池化的会执行回收内存(直接内存回收和堆内存回收))
9、零拷贝
ByteBuf的slice(int index,intlength);
他是将一个bytebuf按照指定的索引切,切多长的一个buf。内部使用的是同一个内存的buf。用新的指针来指向切出来的buf。
所以在改变一个buf的同时。另一个buf的内容也会改变
(注意,切出来的buf不允许往里添加新的内容,释放了原有的buf的内容后会影响切出来的buf)
duplicate()方法是直接将整个buf的读写指针复制一份(使用的还是同一块内存)
想要真正的复制就使用copy相关的方法
compositeByteBuf
创建
CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer();
//在逻辑上将几个Bytebuf组合在一起
//带上一个boolean参数的方法,自动计算读写指针,不然读写指针还是为0
buf.addComponents(true,buf1,buf2);