前面介紹完了org.apache.mina.core.session這個包,現在開始進入org.apache.mina.core. polling包。這個包裡包含了實作基于輪詢政策(比如NIO的select調用或其他類型的I/O輪詢系統調用(如epoll,poll,kqueue等)的基類。
先來看AbstractPollingIoAcceptor這個抽象基類,它繼承自AbstractIoAcceptor,兩個泛型參數分别是所處理的會話和伺服器端socket連接配接。底層的sockets會被不斷檢測,并當有任何一個socket需要被處理時就會被喚醒去處理。這個類封裝了伺服器端socket的bind,accept和dispose等動作,其成員變量Executor負責接受來自用戶端的連接配接請求,另一個AbstractPollingIoProcessor用于處理用戶端的I/O操作請求,如讀寫和關閉連接配接。
其最重要的幾個成員變量是:
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//注冊隊列
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//取消注冊隊列
private final Map<SocketAddress, H> boundHandles = Collections
.synchronizedMap(new HashMap<SocketAddress, H>());//本地位址到伺服器socket的映射表
先來看看當服務端調用bind後的處理過程:
複制代碼
protected final Set<SocketAddress> bind0(
List<? extends SocketAddress> localAddresses) throws Exception {
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);//注冊請求
registerQueue.add(request);//加入注冊隊列中,等待worker處理
//建立一個Worker執行個體,開始工作
startupWorker();
wakeup();
request.awaitUninterruptibly();
// 更新本地綁定位址
Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
for (H handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}
真正的負責接收用戶端請求的工作都是Worker線程完成的,
private class Worker implements Runnable {
public void run() {
int nHandles = 0;
while (selectable) {
try {
// Detect if we have some keys ready to be processed
boolean selected = select();//檢測是否有SelectionKey已經可以被處理了
nHandles += registerHandles();//注冊伺服器sockets句柄,這樣做的目的是将Selector的狀态置于OP_ACCEPT,并綁定到所監聽的端口上,表明接受了可以接收的來自用戶端的連接配接請求,
if (selected) {
processHandles(selectedHandles());//處理可以被處理的SelectionKey狀态為OP_ACCEPT的伺服器socket句柄集(即真正處理來自用戶端的連接配接請求)
}
nHandles -= unregisterHandles();//檢查是否有取消連接配接的用戶端請求
if (nHandles == 0) {
synchronized (lock) {
if (registerQueue.isEmpty()
&& cancelQueue.isEmpty()) {//完成工作
worker = null;
break;
}
}
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);//線程休眠一秒
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
if (selectable && isDisposing()) {//釋放資源
selectable = false;
if (createdProcessor) {
processor.dispose();
} finally {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
private int registerHandles() {//注冊伺服器sockets句柄
for (;;) {
AcceptorOperationFuture future = registerQueue.poll();
Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
List<SocketAddress> localAddresses = future.getLocalAddresses();
try {
for (SocketAddress a : localAddresses) {
H handle = open(a);//打開指定位址,傳回伺服器socket句柄
newHandles.put(localAddress(handle), handle);//加入位址—伺服器socket映射表中
boundHandles.putAll(newHandles);//更新本地綁定位址集
// and notify.
future.setDone();//完成注冊過程
return newHandles.size();
} catch (Exception e) {
future.setException(e);
} finally {
// Roll back if failed to bind all addresses.
if (future.getException() != null) {
for (H handle : newHandles.values()) {
try {
close(handle);//關閉伺服器socket句柄
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
wakeup();
private void processHandles(Iterator<H> handles) throws Exception {//處理來自用戶端的連接配接請求
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
T session = accept(processor, handle);//為一個伺服器socket句柄handle真正接收來自用戶端的請求,在給定的所關聯的processor上傳回會話session
if (session == null) {
break;
finishSessionInitialization(session, null, null);//結束會話初始化
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
這個類中有個地方值得注意,就是wakeup方法,它是用來中斷select方法的,當注冊隊列或取消注冊隊列發生變化時需要調用它,可以參看本類的一個子類NioSocketAcceptor的實作:
protected boolean select() throws Exception {
return selector.select() > 0;
protected void wakeup() {
selector.wakeup();
我們可以查閱jdk文檔,它對Selector的select方法有如下解釋:選擇一組鍵,其相應的通道已為 I/O 操作準備就緒。 此方法執行處于阻塞模式的選擇操作。僅在至少選擇一個通道、調用此選擇器的 wakeup 方法、目前的線程已中斷,或者給定的逾時期滿(以先到者為準)後此方法才傳回。
參考資料
1,《Java NIO非阻塞伺服器示例》
本文轉自Phinecos(洞庭散人)部落格園部落格,原文連結:http://www.cnblogs.com/phinecos/archive/2008/12/08/1350315.html,如需轉載請自行聯系原作者