一、前言
Netty 為許多通用協定提供了編解碼器和處理器,幾乎可以開箱即用, 這減少了你在那些相當繁瑣的事務上本來會花費的時間與精力。另外,這篇文章中,就不涉及 Netty 對 WebSocket協定 的支援了,因為涉及的篇幅有點大,會在下一篇文章做一個具體的介紹。
二、SSL 協定
SSL 協定是安全協定,層疊在其他協定之上。為了支援 SSL/TLS, Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 類使得實作解密和加密相當簡單直接。 Netty 通過一個名為 SslHandler 的 ChannelHandler 實作利用了這個 API, 其中 SslHandler 在内部使用 SSLEngine 來完成實際的工作。下圖描述的是 SslHandler 的資料流。

@Override
protected void initChannel(Channel ch) throws Exception {
ByteBufAllocator byteBufAllocator = ch.alloc();
//對于每個 SslHandler 執行個體,都使用 Channel 的 ByteBufAllocator 從 SslContext 擷取一個新的 SSLEngine
SSLEngine sslEngine = context.newEngine(byteBufAllocator);
//伺服器端模式,用戶端模式設定為true
sslEngine.setUseClientMode(false);
//不需要驗證用戶端,用戶端不設定該項
sslEngine.setNeedClientAuth(false);
//要将 SslHandler 設定為第一個 ChannelHandler。這確定了隻有在所有其他的 ChannelHandler 将他們的邏輯應用到資料之後,才會進行加密。
//startTls 如果為true,第一個寫入的消息将不會被加密(用戶端應該設定為true)
ch.pipeline().addFirst("ssl",new SslHandler(sslEngine, startTls));
}
tips:對于 ChannelPipeline 鍊中 ChannelHandler 執行的順序 —— 入站事件順序執行、出站事件逆序執行。
三、HTTP 協定
HTTP 是基于請求/響應模式的:用戶端向伺服器發送一個 HTTP 請求,然後伺服器将會傳回一個 HTTP 響應。 下圖展示了 Netty 中 HTTP請求和響應的組成部分:
Netty 對 HTTP 協定的支援主要提供了以下 ChannelHandler:
HttpResponseDecoder:解碼器,用于用戶端,解碼來自服務端的響應。
HttpRequestEncoder:編碼器,使用者用戶端,編碼向服務端發送的請求。
HttpRequestDecoder:解碼器,用于服務端,解碼來自用戶端的請求。
HttpResponseEncoder:編碼器,用于服務端,編碼向用戶端的響應。
HttpClientCodec:編解碼器,使用者用戶端,效果等于 HttpResponseDecoder + HttpRequestEncoder。
HttpServerCodec:編解碼器,使用者服務端,效果等于 HttpRequestDecoder + HttpResponseEncoder。
HttpObjectAggregator:聚合器,由于 HTTP 的請求和響應可能由許多部分組成,需要聚合它們以形成完整的消息,HttpObjectAggregator 可以将多個消息部分合并為 FullHttpRequest 或者 FullHttpResponse 消息。
HttpContentCompressor:壓縮,使用者服務端,壓縮要傳輸的資料,支援 gzip 和 deflate 壓縮格式。
HttpContentDecompressor:解壓縮,用于用戶端,解壓縮服務端傳輸的資料。
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine sslEngine = sslContext.newEngine(ch.alloc());
if (isClient) {
//使用 HTTPS,添加 SSL 認證
pipeline.addFirst("ssl", new SslHandler(sslEngine, true));
pipeline.addLast("codec", new HttpClientCodec());
//1、建議開啟壓縮功能以盡可能多地減少傳輸資料的大小
//2、用戶端處理來自伺服器的壓縮内容
pipeline.addLast("decompressor", new HttpContentDecompressor());
}else {
pipeline.addFirst("ssl", new SslHandler(sslEngine));
//HttpServerCodec:将HTTP用戶端請求轉成HttpRequest對象,将HttpResponse對象編碼成HTTP響應發送給用戶端。
pipeline.addLast("codec", new HttpServerCodec());
//服務端,壓縮資料
pipeline.addLast("compressor", new HttpContentCompressor());
}
//目的多個消息轉換為一個單一的FullHttpRequest或是FullHttpResponse
//将最大的消息為 512KB 的HttpObjectAggregator 添加到 ChannelPipeline
//在消息大于這個之後會抛出一個 TooLongFrameException 異常。
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
}
tips:當使用 HTTP 時,建議開啟壓縮功能以盡可能多地減小傳輸資料的大小。雖然壓縮會帶來一些 CPU 時鐘周期上的開銷。
四、拆包和粘包的解決方案
TCP 傳輸過程中,用戶端發送了兩個資料包,而服務端卻隻收到一個資料包,用戶端的兩個資料包粘連在一起,稱為粘包;
TCP 傳輸過程中,用戶端發送了兩個資料包,服務端雖然收到了兩個資料包,但是兩個資料包都是不完整的,或多了資料,或少了資料,稱為拆包;
發生TCP粘包、拆包主要是由于下面一些原因:
1、應用程式寫入的資料大于套接字緩沖區大小,這将會發生拆包。
2、應用程式寫入資料小于套接字緩沖區大小,網卡将應用多次寫入的資料發送到網絡上,這将會發生粘包。
3、進行MSS(最大封包長度)大小的TCP分段,當TCP封包長度-TCP頭部長度>MSS的時候将發生拆包。
4、接收方法不及時讀取套接字緩沖區資料,這将發生粘包。
Netty 預定義了一些解碼器用于解決粘包和拆包現象,其中大體分為兩類:
基于分隔符的協定:在資料包之間使用定義的字元來标記消息或者消息段的開頭或者結尾。這樣,接收端通過這個字元就可以将不同的資料包拆分開。
基于長度的協定:發送端給每個資料包添加標頭部,頭部中應該至少包含資料包的長度,這樣接收端在接收到資料後,通過讀取標頭部的長度字段,便知道每一個資料包的實際長度了。
基于分隔符的協定
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
// 将提取到的桢轉發給下一個Channelhandler
new LineBasedFrameDecoder(64 * 1024),
// 添加 FrameHandler 以接收幀
new FrameHandler()
);
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
//Do something with the data extracted from the frame
}
}
}
基于長度的協定
LengthFieldBasedFrameDecoder 是 Netty 基于長度協定解決拆包粘包問題的一個重要的類,主要結構就是 header+body 結構。我們隻需要傳入正确的參數就可以發送和接收正确的資料,那嗎重點就在于這幾個參數的意義。下面我們就具體了解一下這幾個參數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
maxFrameLength:最大幀長度。也就是可以接收的資料的最大長度。如果超過,此次資料會被丢棄。
lengthFieldOffset:長度域偏移。就是說資料開始的幾個位元組可能不是表示資料長度,需要後移幾個位元組才是長度域。
lengthFieldLength:長度域位元組數。用幾個位元組來表示資料長度。
lengthAdjustment:資料長度修正。因為長度域指定的長度可以使 header+body 的整個長度,也可以隻是body的長度。如果表示header+body的整個長度,那麼我們需要修正資料長度。
initialBytesToStrip:跳過的位元組數。如果你需要接收 header+body 的所有資料,此值就是0,如果你隻想接收body資料,那麼需要跳過header所占用的位元組數。
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8),
new FrameHandler()
);
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
//處理桢的資料
}
}
}
tips:UDP協定不會發生沾包或拆包現象, 因為UDP是基于封包發送的,在UDP首部采用了16bit來訓示UDP資料封包的長度,是以在應用層能很好的将不同的資料封包區分開。
五、其他
由于網絡飽和的可能性,如何在異步架構中高效地寫大塊的資料是一個特殊的問題。Netty 通過一個 FileRegion 接口來實作,其在 Netty 的API 文檔中的定義是:"通過支援零拷貝的檔案傳輸的 Channel 來發送的檔案區域"。但是該接口隻适用于檔案内容的直接傳輸,不包括應用程式對檔案資料的任何處理。
View Code
如果大塊的資料要從檔案系統複制到使用者記憶體中時,可以安裝一個 ChunkedWriteHandler,并用 ChunkedInput 實作寫入檔案資料。 它支援異步寫大型資料流,而又不會導緻大量的記憶體消耗。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new SslHandler(sslCtx.newEngine(ch.alloc())),
// 添加 ChunkedWriteHandler 以處理作為 ChunkedInput 傳入的資料
new ChunkedWriteHandler(),
new WriteStreamHandler()
);
}
private final class WriteStreamHandler extends ChannelHandlerAdapter {
//當連接配接建立時,channelActive() 方法将使用 ChunkedInput 寫檔案資料
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
ChunkedWriteHandlerInitializer.java
Netty提供的用于和JDK進行互操作的序列化類 :
Netty提供的用于和 JBoss Marshalling 進行互操作的序列化類 :
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new MarshallingDecoder(unmarshallerProvider),
new MarshallingEncoder(marshallerProvider),
new ObjectHandler()
);
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Serializable msg) throws Exception { }
}
}
MarshallingInitializer.java
Netty提供的用于和 Protocol Buffers 進行互操作的序列化類 :
參考資料:《Netty IN ACTION》
示範源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/protocol