前言
Java NIO 由以下幾個核心部分組成:
- Buffer
- Channel
- Selector
以前基于net包進行socket程式設計時,accept方法會一直阻塞,直到有用戶端請求的到來,并傳回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去擷取并處理後面的請求;當然我們可以把擷取socket和處理socket的過程分開,一個線程負責accept,線程池負責處理請求。
NIO為我們提供了更好的解決方案,采用選擇器(Selector)找出已經準備好讀寫的socket,并按順序處理,基于通道(Channel)和緩沖區(Buffer)來傳輸和儲存資料。
Buffer和Channel已經介紹過深入淺出NIO Channel和Buffer,本文主要介紹NIO的Selector和Socket的實踐以及實作原理。
Selector是什麼?
在養雞場,有這一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來。這樣,如果負責人想知道雞場情況,隻需要到那個人查詢即可,當然前提是,負責得讓那個人知道需要記錄哪些情況。
Selector的作用相當這個人的工作,每個雞籠相當于一個SocketChannel,單個線程通過Selector可以管理多個SocketChannel。
A Thread uses a Selector to handle 3 Channels
為了實作Selector管理多個SocketChannel,必須将多個具體的SocketChannel對象注冊到Selector對象,并聲明需要監聽的事件,目前有4種類型的事件:
connect:用戶端連接配接服務端事件,對應值為SelectionKey.OP_CONNECT(8)
accept:服務端接收用戶端連接配接事件,對應值為SelectionKey.OP_ACCEPT(16)
read:讀事件,對應值為SelectionKey.OP_READ(1)
write:寫事件,對應值為SelectionKey.OP_WRITE(4)
當SocketChannel有對應的事件發生時,Selector能夠覺察到并進行相應的處理。
為了更好地了解NIO Socket,先來看一段服務端的示例代碼
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//将選擇器注冊到連接配接到的用戶端信道,
//并指定該信道key值的屬性為OP_READ,
//同時為該信道指定關聯的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}
服務端連接配接過程
1、建立ServerSocketChannel執行個體serverSocketChannel,并bind到指定端口。
2、建立Selector執行個體selector;
3、将serverSocketChannel注冊到selector,并指定事件OP_ACCEPT。
4、while循環執行:
4.1、調用select方法,該方法會阻塞等待,直到有一個或多個通道準備好了I/O操作或等待逾時。
4.2、擷取選取的鍵清單;
4.3、循環鍵集中的每個鍵:
4.3.a、擷取通道,并從鍵中擷取附件(如果添加了附件);
4.3.b、确定準備就緒的操縱并執行,如果是accept操作,将接收的信道設定為非阻塞模式,并注冊到選擇器;
4.3.c、如果需要,修改鍵的興趣操作集;
4.3.d、從已選鍵集中移除鍵
在步驟3中,selector隻注冊了serverSocketChannel的OP_ACCEPT事件
- 如果有用戶端A連接配接服務,執行select方法時,可以通過serverSocketChannel擷取用戶端A的socketChannel,并在selector上注冊socketChannel的OP_READ事件。
- 如果用戶端A發送資料,會觸發read事件,這樣下次輪詢調用select方法時,就能通過socketChannel讀取資料,同時在selector上注冊該socketChannel的OP_WRITE事件,實作伺服器往用戶端寫資料。
NIO Socket實作原理
SocketChannel、ServerSocketChannel和Selector的執行個體初始化都通過SelectorProvider類實作,其中Selector是整個NIO Socket的核心實作。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
SelectorProvider在windows和linux下有不同的實作,provider方法會傳回對應的實作。
Selector分析
Selector是如何做到同時管理多個socket?
Selector初始化時,會執行個體化PollWrapper、SelectionKeyImpl數組和Pipe。
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
pollWrapper用Unsafe類申請一塊實體記憶體,存放注冊時的socket句柄fdVal和event的資料結構pollfd,其中pollfd共8位,0~3位儲存socket句柄,4~7位儲存event。
pollfd
pollWrapper
pollWrapper提供了fdVal和event資料的相應操作,如添加操作通過Unsafe的putInt和putShort實作。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
SelectionKeyImpl儲存注冊時的channel、selector、event以及儲存在pollWrapper的偏移位置index。
先看看
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
是如何實作的:
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException {
synchronized (regLock) {
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
- 如果該channel和selector已經注冊過,則直接添加事件和附件。
- 否則通過selector實作注冊過程。
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
- 以目前channel和selector為參數,初始化 SelectionKeyImpl 對象selectionKeyImpl ,并添加附件attachment。
- 如果目前channel的數量totalChannels等于SelectionKeyImpl數組大小,對SelectionKeyImpl數組和pollWrapper進行擴容操作。
- 如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
- pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到對應的pollfd。
- k.interestOps(ops)方法最終也會把event添加到對應的pollfd。
是以,不管serverSocketChannel,還是socketChannel,在selector注冊事件後,最終都儲存在pollArray中。
接着,再來看看selector中的select是如何實作一次擷取多個有事件發生的channel的。
底層由selector實作類的doSelect方法實作,如下:
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
其中 subSelector.poll() 是select的核心,由native函數poll0實作,readFds、writeFds 和exceptFds數組用來儲存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其餘位置存放發生事件的socket句柄fd。
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
執行 selector.select() ,poll0函數把指向socket句柄和事件的記憶體位址傳給底層函數。
- 如果之前沒有發生事件,程式就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間内如果沒有事件,也會傳回。
- 一旦有對應的事件發生,poll0方法就會傳回。
- processDeregisterQueue方法會清理那些已經cancelled的SelectionKey
- updateSelectedKeys方法統計有事件發生的SelectionKey數量,并把符合條件發生事件的SelectionKey添加到selectedKeys哈希表中,提供給後續使用。
在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實作,是基于IO複用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實作,底層使用epoll替換了select/poll。
epoll原理
epoll是Linux下的一種IO多路複用技術,可以非常高效的處理數以百萬計的socket句柄。
先看看使用c封裝的3個epoll系統調用:
-
int epoll_create(int size)
epoll_create建立一個epoll對象。參數size是核心保證能夠正确處理的最大句柄數,多于這個最大數時核心可不保證效果。
-
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll_ctl可以操作epoll_create建立的epoll,如将socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
-
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
epoll_wait在調用時,在給定的timeout時間内,所監控的句柄中有事件發生時,就傳回使用者态的程序。
大概看看epoll内部是怎麼實作的:
- epoll初始化時,會向核心注冊一個檔案系統,用于存儲被監控的句柄檔案,調用epoll_create時,會在這個檔案系統中建立一個file節點。同時epoll會開辟自己的核心高速緩存區,以紅黑樹的結構儲存句柄,以支援快速的查找、插入、删除。還會再建立一個list連結清單,用于存儲準備就緒的事件。
- 當執行epoll_ctl時,除了把socket句柄放到epoll檔案系統裡file對象對應的紅黑樹上之外,還會給核心中斷處理程式注冊一個回調函數,告訴核心,如果這個句柄的中斷到了,就把它放到準備就緒list連結清單裡。是以,當一個socket上有資料到了,核心在把網卡上的資料copy到核心中後,就把socket插入到就緒連結清單裡。
- 當epoll_wait調用時,僅僅觀察就緒連結清單裡有沒有資料,如果有資料就傳回,否則就sleep,逾時時立刻傳回。
epoll的兩種工作模式:
- LT:level-trigger,水準觸發模式,隻要某個socket處于readable/writable狀态,無論什麼時候進行epoll_wait都會傳回該socket。
- ET:edge-trigger,邊緣觸發模式,隻有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會傳回該socket。
socket讀資料
socket寫資料
read實作
通過周遊selector中的SelectionKeyImpl數組,擷取發生事件的socketChannel對象,其中儲存了對應的socket句柄,實作如下。
public int read(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
synchronized (readLock) {
if (!ensureReadOpen())
return -1;
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen()) {
return 0;
}
readerThread = NativeThread.current();
}
for (;;) {
n = IOUtil.read(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen()) {
// The system call was interrupted but the channel
// is still open, so retry
continue;
}
return IOStatus.normalize(n);
}
} finally {
readerCleanup(); // Clear reader thread
// The end method, which
end(n > 0 || (n == IOStatus.UNAVAILABLE));
// Extra case for socket channels: Asynchronous shutdown
//
synchronized (stateLock) {
if ((n <= 0) && (!isInputOpen))
return IOStatus.EOF;
}
assert IOStatus.check(n);
}
}
}
通過Buffer的方式讀取socket的資料。
wakeup實作
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
看來wakeupSinkFd這個變量是為wakeup方法使用的。
其中interruptTriggered為中斷已觸發标志,當pollWrapper.interrupt()之後,該标志即為true了;因為這個标志,連續兩次wakeup,隻會有一次效果。
為了實作client和server的資料互動,Linux下采用管道pipe實作,windows下采用兩個socket之間的通信進行實作,它們都有這樣的特性:
- 都有兩個端,一個 是read端,一個是write端,windows中兩個socket也是read和write的角色。
- 當往write端寫入 資料,則read端即可以收到資料。
熬夜不易,點選請老王喝杯烈酒!!!!!!!