1、Netty是由JBOSS提供的一個java開源架構。Netty提供異步的、事件驅動的網絡應用程式架構和工具,用以快速開發高性能、高可靠性的網絡伺服器和用戶端程式。也就是說,Netty 是一個基于NIO的客戶、伺服器端程式設計架構,使用Netty 可以確定你快速和簡單的開發出一個網絡應用,例如實作了某種協定的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的程式設計開發過程,例如,TCP和UDP的socket服務開發。
2、目前netty有3個版本netty3、netty4、netty5。3個版本的内容有所不同。neety3是核心的代碼介紹。相對于netty4、和netty5的複雜性來說。netty3的源碼是值得學習的。我這裡解析了netty3的一些源碼,僅供大家了解,也是為了友善大家了解做了很多簡化。不代表作者的開發思路。
3、我們先來看一張圖(這張圖是我在學習源碼的時候扣的,哈哈)
一、傳統NIO流

1)一個線程裡面,存在一個selector,當然這個selector也承擔起看大門和服務客人的工作。
2)這裡不管多少用戶端進來,都是這個selector來處理。這樣就就加大了這個服務員的工作量
3)為了加入線程池,讓多個selector同時工作,當時目的性都是一樣的。
4)雖然看大門的和服務客人的都是服務員,但是還是存在差别的。為了更好的處理多個線程的問題。是以這裡netty就誕生了。
二、netty架構
了解:
1)netty3的架構也是基于nio流做出來的。是以這裡會詳細介紹netty3架構的思路
2)将看門的服務員和服務客人的服務員分開。形成兩塊(也就是2個線程池,也就是後面的boss和worker)
3)當一個客人來的時候,首先boss,進行接待。然後boss配置設定工作給worker,這個,在兩個線程池的工作下,有條不亂。
4)原理:就是将看大門的selector和服務客人的selector分開。然後通過boss線程池,下發任務給對應的worker
4、netty3源碼分析
1)加入對應的jar包。我這裡為了了解源碼用的是netty3的包。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
</dependency>
2)目錄結構
說明:
a、NettyBoss、NettyWork是針對于selector做區分。雖然他們很多共性,我這裡為了好了解,并沒有做抽象類(忽略開發思路)。
b、ThreadHandle是用來初始化線程池和對應的接口。
c、Start為啟動類
3)NettyBoss(看大門的服務員,第一種線程selector)
package com.troy.application.netty;
import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class NettyBoss {
//線程池
public final Executor executor;
//boss選擇器
protected Selector selector;
//原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其他線程的執行
protected final AtomicBoolean wakenUp = new AtomicBoolean();
//隊列,線程安全隊列。
public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
//線程處理,這裡主要是拿到work的線程池
protected ThreadHandle threadHandle;
//初始化
public NettyBoss(Executor executor,ThreadHandle threadHandle) {
//指派
this.executor = executor;
this.threadHandle = threadHandle;
try {
//每一個線程選擇器
this.selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
//從線程中擷取一個線程執行以下内容
executor.execute(() -> {
while (true) {
try {
//這裡的目前就是排除其他線程同僚執行,false因為這裡處于阻塞狀态,不用開啟
wakenUp.set(false);
//選擇器阻塞
selector.select();
//運作隊列中的任務
while (true) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
//如果任務存在開始運作
task.run();
}
//對進來的進行處理
this.process(selector);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 新用戶端
SocketChannel channel = server.accept();
// 設定為非阻塞
channel.configureBlocking(false);
// 擷取一個worker
NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)];
// 注冊新用戶端接入任務
Runnable runnable = () -> {
try {
//将用戶端注冊到selector中
channel.register(nextworker.selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
};
//添加到work的隊列中
nextworker.taskQueue.add(runnable);
if (nextworker.selector != null) {
//這裡的目前就是開啟執行過程
if (nextworker.wakenUp.compareAndSet(false, true)) {
//放開本次阻塞,進行下一步執行
nextworker.selector.wakeup();
}
} else {
//任務完成移除線程
taskQueue.remove(runnable);
}
System.out.println("新用戶端連結");
}
}
}
解釋:
a、初始化的時候,指派線程池,和線程處理類(線程處理類目的是擷取worker的工作線程)
b、executor為線程池的執行過程。
c、selector.select()為形成阻塞,wakenUp為了線程安全考核。在接入用戶端的時候用selector.wakeup()來放開本次阻塞(很重要)。
d、然後在worker安全隊列中執行對應工作。(taskQueue的目前在boss和worker中的作用都是為了考慮線程安全,這裡采用線程安全隊列的目的是為了不直接操作其他線程)
e、wakenUp.compareAndSet(false, true),這裡是考慮并發問題。在本線程運作的時候,其他線程處于等待狀态。這裡也是為了線程安全考慮。
4)NettyWork(服務客人的服務員,第二種selector)
package com.troy.application.netty;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class NettyWork {
//線程池
public final Executor executor;
//boss選擇器
protected Selector selector;
//原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其他線程的執行
protected final AtomicBoolean wakenUp = new AtomicBoolean();
//隊列,線程安全隊列。
public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
//初始化
public NettyWork(Executor executor) {
this.executor = executor;
try {
//每一個work也需要一個選擇器用來管理通道
this.selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
//從線程池中擷取一個線程開始執行
executor.execute(() -> {
while (true) {
try {
//阻塞狀态排除問題
wakenUp.set(false);
//阻塞
selector.select();
//處理work任務
while (true) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
//存在work任務開始執行
task.run();
}
//處理任務
this.process(selector);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 移除,防止重複處理
ite.remove();
// 得到事件發生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 資料總長度
int ret = 0;
boolean failure = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取資料
try {
ret = channel.read(buffer);
failure = false;
} catch (Exception e) {
// ignore
}
//判斷是否連接配接已斷開
if (ret <= 0 || failure) {
key.cancel();
System.out.println("用戶端斷開連接配接");
}else{
System.out.println("收到資料:" + new String(buffer.array()));
//回寫資料
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
channel.write(outBuffer);// 将消息回送給用戶端
}
}
}
a、worker的執行方式基本上面和boss的方式是一樣的,隻不夠是處理方式不一樣
b、這裡需要注意的是,都是考慮線程隊列執行。
3)ThreadHandle(線程處理,這裡主要是啟動需要的東西)
package com.troy.application.netty;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadHandle {
public final AtomicInteger bossIndex = new AtomicInteger();
public static NettyBoss[] bosses;
public final AtomicInteger workerIndex = new AtomicInteger();
public static NettyWork[] workeres;
public ThreadHandle(ExecutorService boss,ExecutorService work) {
this.bosses = new NettyBoss[1];
//初始化boss線程池
for (int i = 0; i < bosses.length; i++) {
bosses[i] = new NettyBoss(boss,this);
}
this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2];
//初始化work線程池
for (int i = 0; i < workeres.length; i++) {
workeres[i] = new NettyWork(work);
}
}
public void bind(InetSocketAddress inetSocketAddress) {
try {
// 獲得一個ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設定通道為非阻塞
serverChannel.configureBlocking(false);
// 将該通道對應的ServerSocket綁定到port端口
serverChannel.socket().bind(inetSocketAddress);
//擷取一個boss線程
NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)];
//向boss注冊一個ServerSocket通道
Runnable runnable = () -> {
try {
//注冊serverChannel到selector
serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
};
//加入任務隊列
nextBoss.taskQueue.add(runnable);
if (nextBoss.selector != null) {
//排除其他任務處理
if (nextBoss.wakenUp.compareAndSet(false, true)) {
//放開阻塞
nextBoss.selector.wakeup();
}
} else {
//移除任務
nextBoss.taskQueue.remove(runnable);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
a、這裡采用數組的形式,主要目的是考慮多個看門的,和多個服務客人的線程。為了好控制,好選擇,哪一個來執行。
b、端口的注冊,在NettyBoss裡面進行初始化的的原理都是一樣的。
4)start
package com.troy.application.netty;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Start {
public static void main(String[] args) {
//聲明線程池
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService work = Executors.newCachedThreadPool();
//初始化線程池
ThreadHandle threadHandle = new ThreadHandle(boss,work);
//聲明端口
threadHandle.bind(new InetSocketAddress(9000));
System.out.println("start");
}
}
說明一下流程
a、初始化boss和work。讓boss線程池加入設定第一種boss的selector,并且處于阻塞狀态。work的初始化也基本上是一樣的,隻不過換成了第二種selector線程池,處于阻塞狀态。
b、當線程處理類初始化監聽端口的時候。就是選擇boss中其中一個selector。聲明一個線程先監聽,加入boss的線程安全隊列中。然後放開boss阻塞,向下執行。線程執行會監聽對應端口并阻塞。
c、當一個用戶端接入的時候,boss中的selector會監聽到對應端口。然後選擇work線程中的一個selector給work分派任務。
d、最後work中的selector來處理事務。
4、源碼下載下傳:
https://pan.baidu.com/s/1pKIxuMf5、本代碼隻是用于了解netty的實作過程,不代表開發思路。其中我為了簡化代碼,做了很多調整。目的就是壓縮代碼,友善了解。