簡介
之前的系列文章中我們學到了netty的基本結構和工作原理,各位小夥伴一定按捺不住心中的喜悅,想要開始手寫代碼來體驗這神奇的netty架構了,剛好最近東京奧運會,我們寫一個netty的用戶端和伺服器為中國加油可好?
場景規劃
那麼我們今天要搭建什麼樣的系統呢?
首先要搭建一個server伺服器,用來處理所有的netty客戶的連接配接,并對用戶端發送到伺服器的消息進行處理。
還要搭建一個用戶端,這個用戶端負責和server伺服器建立連接配接,并發送消息給server伺服器。在今天的例子中,用戶端在建立連接配接過後,會首先發送一個“中國”消息給伺服器,然後伺服器收到消息之後再傳回一個”加油!“ 消息給用戶端,然後用戶端收到消息之後再發送一個“中國”消息給伺服器…. 以此往後,循環反複直到奧運結束!
我們知道用戶端和伺服器端進行消息處理都是通過handler來進行的,在handler裡面,我們可以重寫channelRead方法,這樣在讀取channel中的消息之後,就可以對消息進行處理了,然後将用戶端和伺服器端的handler配置在Bootstrap中啟動就可以了,是不是很簡單?一起來做一下吧。
啟動Server
假設server端的handler叫做CheerUpServerHandler,我們使用ServerBootstrap建構兩個EventLoopGroup來啟動server,有看過本系列最前面文章的小夥伴可能知道,對于server端需要啟動兩個EventLoopGroup,一個bossGroup,一個workerGroup,這兩個group是父子關系,bossGroup負責處理連接配接的相關問題,而workerGroup負責處理channel中的具體消息。
啟動服務的代碼千篇一律,如下所示:
// Server配置
//boss loop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//worker loop
EventLoopGroup workerGroup = new NioEventLoopGroup();
final CheerUpServerHandler serverHandler = new CheerUpServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// tcp/ip協定listen函數中的backlog參數,等待連接配接池的大小
.option(ChannelOption.SO_BACKLOG, 100)
//日志處理器
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
//初始化channel,添加handler
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//日志處理器
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 啟動伺服器
ChannelFuture f = b.bind(PORT).sync();
// 等待channel關閉
f.channel().closeFuture().sync();
不同的服務,啟動伺服器的代碼基本都是一樣的,這裡我們需要注意這幾點。
在ServerBootstrap中,我們加入了一個選項:ChannelOption.SO_BACKLOG,ChannelOption.SO_BACKLOG對應的是tcp/ip協定listen(int socketfd,int backlog)函數中的backlog參數,用來初始化服務端可連接配接隊列,backlog參數指定了這個隊列的大小。因為對于一個連接配接來說,處理用戶端連接配接請求是順序處理的,是以同一時間隻能處理一個用戶端連接配接,多個用戶端來的時候,服務端将不能處理的用戶端連接配接請求放在隊列中等待處理,
另外我們還添加了兩個LoggingHandler,一個是給handler添加的,一個是給childHandler添加的。LoggingHandler主要監控channel中的各種事件,然後輸出對應的消息,非常好用。
比如在伺服器啟動的時候會輸出下面的日志:
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4] REGISTERED
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4] BIND: 0.0.0.0/0.0.0.0:8007
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0xd9b41ea4, L:/0:0:0:0:0:0:0:0:8007] ACTIVE
這個日志是第一個LoggingHandler輸出的,分别代表了伺服器端的REGISTERED、BIND和ACTIVE事件。從輸出我們可以看到,伺服器本身綁定的是0.0.0.0:8007。
在用戶端啟動和伺服器端建立連接配接的時候會輸出下面的日志:
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x37a4ba9f, L:/0:0:0:0:0:0:0:0:8007] READ: [id: 0x6dcbae9c, L:/127.0.0.1:8007 - R:/127.0.0.1:54566]
[nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x37a4ba9f, L:/0:0:0:0:0:0:0:0:8007] READ COMPLETE
上面日志表示READ和READ COMPLETE兩個事件,其中 L:/127.0.0.1:8007 – R:/127.0.0.1:54566 代表本地伺服器的8007端口連接配接了用戶端的54566端口。
對于第二個LoggingHandler來說,會輸出一些具體的消息處理相關的消息。比如REGISTERED、ACTIVE、READ、WRITE、FLUSH、READ COMPLETE等事件,這裡面就不一一列舉了。
啟動用戶端
同樣的,假設用戶端的handler名稱叫做ChinaClientHandler,那麼可以類似啟動server一樣啟動用戶端,如下:
// 用戶端的eventLoop
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//添加日志處理器
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChinaClientHandler());
}
});
// 啟動用戶端
ChannelFuture f = b.connect(HOST, PORT).sync();
用戶端啟動使用的是Bootstrap,我們同樣為他配置了一個LoggingHandler,并添加了自定義的ChinaClientHandler。
消息處理
我們知道有兩種handler,一種是inboundHandler,一種是outboundHandler,這裡我們是要監控從socket讀取資料的事件,是以這裡用戶端和伺服器端的handler都繼承自ChannelInboundHandlerAdapter即可。
消息處理的流程是用戶端和伺服器建立連接配接之後,會首先發送一個”中國“的消息給伺服器。
用戶端和伺服器建立連接配接之後,會觸發channelActive事件,是以在用戶端的handler中就可以發送消息了:
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush("中國");
}
伺服器端在從channel中讀取消息的時候會觸發channelRead事件,是以伺服器端的handler可以重寫channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("收到消息:{}",msg);
ctx.writeAndFlush("加油!");
}
然後用戶端從channel中讀取到”加油!”之後,再将”中國“寫到channel中,是以用戶端也需要重寫方法channelRead:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.writeAndFlush("中國");
}
這樣是不是就可以循環往複的進行下去了呢?
消息進行中的陷阱
事實上,當你執行上面代碼你會發現,用戶端确實将”中國“ 消息寫入了channel,但是伺服器端的channelRead并沒有被觸發。為什麼呢?
研究發下,如果寫入的對象是一個String,程式内部會有這樣的錯誤,但是這個錯誤是隐藏的,你并不會在運作的程式輸出中看到,是以對新手小夥伴還是很不友好的。這個錯誤就是:
DefaultChannelPromise@57f5c075(failure: java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion))
從錯誤的資訊可以看出,目前支援的消息類型有兩種,分别是ByteBuf和FileRegion。
好了,我們将上面的消息類型改成ByteBuf試一試:
message = Unpooled.buffer(ChinaClient.SIZE);
message.writeBytes("中國".getBytes(StandardCharsets.UTF_8));
public void channelActive(ChannelHandlerContext ctx) {
log.info("可讀位元組:{},index:{}",message.readableBytes(),message.readerIndex());
log.info("可寫位元組:{},index:{}",message.writableBytes(),message.writerIndex());
ctx.writeAndFlush(message);
}
上面我們定義了一個ByteBuf的全局message對象,并将其發送給server,然後在server端讀取到消息之後,再發送一個ByteBuf的全局message對象給client,如此循環往複。
但是當你運作上面的程式之後會發現,伺服器端确實收到了”中國“,用戶端也确實收到了”加油!“,但是用戶端後續發送的”中國“消息伺服器端卻收不到了,怎麼回事呢?
我們知道ByteBuf有readableBytes、readerIndex、writableBytes、writerIndex、capacity和refCnt等屬性,我們将這些屬性在message發送前和發送之後進行對比:
在消息發送之前:
可讀位元組:6,readerIndex:0
可寫位元組:14,writerIndex:6
capacity:20,refCnt:1
在消息發送之後:
可讀位元組:6,readerIndex:0
可寫位元組:-6,writerIndex:6
capacity:0,refCnt:0
于是問題找到了,由于ByteBuf在處理過一次之後,refCnt變成了0,是以無法繼續再次重複寫入,怎麼解決呢?
簡單的辦法就是每次發送的時候再重新new一個ByteBuf,這樣就沒有問題了。
但是每次都建立一個對象好像有點浪費空間,怎麼辦呢?既然refCnt變成了0,那麼我們調用ByteBuf中的retain()方法增加refCnt不就行了?
答案就是這樣,但是要注意,需要在發送之前調用retain()方法,如果是在消息被處理過後調用retain()會報異常。
總結
好了,運作上面的程式就可以一直給中國加油了,YYDS!
本文的例子可以參考:
learn-netty4本文已收錄于 http://www.flydean.com/06-netty-cheerup-china/最通俗的解讀,最深刻的幹貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!
歡迎關注我的公衆号:「程式那些事」,懂技術,更懂你!