前言
小編最近好久沒有更新文章了,為自己先辯解一下,最近上線後身體不适,而且工作比較繁忙(還要小編比較懶)。今天小編和大家分享一下netty架構的線程模型和用法。有了前面兩篇博文Netty架構之深入了解NIO核心元件和Netty架構之NIO多路複用選擇器的基礎,再學習netty應該容易點了吧,同樣廢話不多說,進入正題。
Reactor模式
這裡大家會不會有疑惑,為什麼要介紹Reactor模式,這是因為Netty就是使用該模型實作的。
Reactor模式
Reactor 是反應堆的意思,Reactor 模型是指通過一個或多個輸入同時傳遞給處理器。服務端将它們同步分派給請求對應的處理線程,Reactor 模式也叫 Dispatcher 模式。Netty 整體是整體采用了主從Reactor模型。
Reactor角色
Reactor模型中有三種角色,分别是:
- Acceptor:處理用戶端新連接配接,并分派請求到處理器鍊中
- Reactor:負責監聽和配置設定事件,将I/O事件分派給對應的Handler。
- Handler:事件處理,如編碼、解碼等
Netty架構之線程模型與基礎用法前言Reactor模式Netty線程模型Netty基本用法ServerBootStrap用法總結
Reactor 線程模型
Reactor有三種線程模型分别是:單Reactor單線程模型、單Reactor多線程模型、主從Reactor多線程模型。而Netty正是采用最後一種。
單Reactor單線程模型
該模型下所有請求建立、IO讀寫、業務處理都在一個線程中完成。如果在業務中進行中出現了耗時操作,就會導緻所有請求全部處理延時。因為他們是有由一個線程同步處理的。
單Reactor多線程模型
為了防止業務處理導緻阻塞,在多線程模型下會用一個線程池來異步處理業務,當處理完成後在回寫給用戶端。
主從Reactor多線程模型
單React始終無法發揮現代伺服器多核CPU的并行處理能力,是以Reactor是可以有多個的,并且有一主多從之分。一個主Reactor僅處理連接配接,而多個子Reactor用于處理IO讀寫。然後交給線程池處理業務。Tomcat就是采用該模式實作。
以上就是Reactor的三種線程模型,大家可以和前兩篇博文中的代碼對比一下。應該比較好了解。并且各個模型的優缺點也一目了然。好了進入咱們的重點,netty的線程模型。
Netty線程模型
Netty采用了主從Reactor模型實作,咱們先看下圖:
上面主Reactor即對應Boss group 線程組,多個子Reactor對應Worker Group 線程組。主線程用于接收連接配接,并将建立好的連接配接注冊到Workr 組,當最後IO事件觸發後由對應Pipeline進行處理。
說明了netty線程模型,那小編接下來介紹一下上面模型中的各個元件。讓我們進入netty的基本用法。
Netty基本用法
EventLoop
事件循環器,這裡充當了Reactor的核心。每個EventLoop 都會包含一個Selector選擇器,用于處理IO事件,和一個taskQueue用于存儲使用者送出的任務。此EventLoop會用一個獨有的線程,預設是不啟動的,當有任務觸發時就會啟動,并一直輪詢下去。
代碼示例
以下示例即聲明1大小的線程組,當submit送出任務之後該EventLoop就會啟動
@Test
public void nioEventLoopGroupTest(){
NioEventLoopGroup group = new NioEventLoopGroup(1);
group.submit(() -> System.out.println("submit:"+Thread.currentThread().getId()));
// 優雅關閉EventLoop
group.shutdownGracefully();
}
除了送出任務外,其更重要的事情是處理Channel 相關IO事件。如管道注冊。調用register方法最終會調用NIO當中的register方法進行注冊,隻不過Netty已經實作封裝好了,并且處理好了同步鎖的問題。
Netty 為了安全的調用IO操作,把所有對IO的直接操作都封狀了一個任務,交由IO線程執行。是以我們通過Netty來調用IO是不會立即傳回的
NioChannel與Channelhandler
原有的Nio中的Channel 被封裝成了NioChannel,當然最終底層還是在調用NIO的Channel。原來對Channel 中讀寫事件處理被封裝成Channelhandler進行處理,并用引入Pipeline的概念。我們先來了解一下他們的用法。
代碼示例:
小編依舊使用tcp來做個netty的基礎用法:
@Test
public void nettySocketTest(){
//打開管道、注冊選擇器最後綁定端口
NioEventLoopGroup group = new NioEventLoopGroup(1);
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
group.register(nioServerSocketChannel);
nioServerSocketChannel.bind(new InetSocketAddress(8888));
nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//處理連接配接和讀操作
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
System.out.println("連接配接建立");
handlerAcceptAndRead(group,msg);
}
});
while (true);
}
private void handlerAcceptAndRead(NioEventLoopGroup group, Object msg) {
//這裡因為netty已經幫我們封裝好了,直接拿就行了
NioSocketChannel nioSocketChannel = (NioSocketChannel)msg;
group.register(nioSocketChannel);
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println(byteBuf.toString(Charset.defaultCharset()));
}
});
}
然後是測試工具測試:
測試結果:
不過以上和小編話的線程模型不像啊,因為沒有主從啊。接下來小編修改一下
@Test
public void nettySocketTest(){
//打開管道、注冊選擇器最後綁定端口
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
//建立多個子EventLoopGrou
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
//主EventLoopGrou隻注冊連接配接
bossGroup.register(nioServerSocketChannel);
nioServerSocketChannel.bind(new InetSocketAddress(8888));
nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//處理連接配接和讀操作
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
System.out.println("連接配接建立");
//幹活的有子EventLoopGrou
handlerAcceptAndRead(workerGroup,msg);
}
});
while (true);
}
測試:
列印内容
ServerBootStrap用法
有了以上的基本用法的代碼,小編給大家講一下現在市面上或者部落格中普遍的用法,使用ServerBootStrap。這是在上面的代碼基礎上又進行了一次封裝。用起來更加簡單。
小編再次用一個http服務分享一下使用方法:
public class HttpNettyTest {
public void open(int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup(81);
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 輸入流 先解碼
channel.pipeline().addLast("decode", new HttpRequestDecoder());
//請求體和請求頭放一起對應下面的FullHttpRequest
channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
//處理業務
channel.pipeline().addLast("servletHandler", new MyServlet());
// 輸出流 編碼
channel.pipeline().addFirst("encode", new HttpResponseEncoder());
}
});
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(future1 -> System.out.println("服務啟動成功"));
}
private class MyServlet extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
//請求頭
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("目前請求url:" + httpRequest.uri());
}
if (msg instanceof FullHttpRequest) {
//請求頭和請求體在一塊
//一般符合我們的開發
}
//請求體
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
byte[] bytes = new byte[content.readableBytes()];
content.readBytes(bytes);
System.out.println("目前上傳内容内容:" + new String(bytes));
}
if (msg instanceof LastHttpContent) {
LastHttpContent httpContent = (LastHttpContent) msg;
ByteBuf content = httpContent.content();
byte[] bytes = new byte[content.readableBytes()];
content.readBytes(bytes);
System.out.println("目前請求内容是最後一個包:" + new String(bytes));
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.content().writeBytes("上傳完畢".getBytes());
ChannelFuture future = channelHandlerContext.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
@Test
public void openServer() {
open(8080);
while (true);
}
}
使用postman測試:
測試結果
以上示例就明白了。這個隻是怎麼用。ServerBootStrap底層就是上面代碼。大家自行領悟。
總結
今天小編主要将了reactor的模型演講以及netty線程模型,和其簡單的應用。今天還是比較簡單的。其實今天小編沒有具體講解netty中的核心元件,下次小編将會講解,期待小編下一次的分享吧。