天天看點

channelinactive觸發後不關閉channel_深入了解NIO系列 - Channel詳解Channel簡介ServerSocketChannelSocketChannelDatagramChannelFileChannel總結FYI

Channel簡介

在Java NIO中,主要有三大基本的元件:Buffer、Channel和Selector,前面兩篇文章我們具體介紹了Selector和Buffer,老規矩,就讓我們繼續慢慢地揭開Channel的神秘面紗吧!

在Java NIO的世界中,Selector是中央控制器,Buffer是承載資料的容器,而Channel可以說是最基礎的門面,它是本地I/O裝置、網絡I/O的通信橋梁,隻有搭建了這座橋梁,資料才能被寫入Buffer,連接配接才能被Selector控制,

Channel這座橋梁分别為本地I/O裝置和網絡I/O提供了以下實作,并且和Java IO體系的類是一一對應的:

  1. 網絡I/O裝置:
  • DatagramChannel:讀寫UDP通信的資料,對應DatagramSocket類
  • SocketChannel:讀寫TCP通信的資料,對應Socket類
  • ServerSocketChannel:監聽新的TCP連接配接,并且會建立一個可讀寫的SocketChannel,對應ServerSocket類
  1. 本地I/O裝置:
  • FileChannel:讀寫本地檔案的資料,不支援Selector控制,對應File類

其類繼承結構如下圖:

channelinactive觸發後不關閉channel_深入了解NIO系列 - Channel詳解Channel簡介ServerSocketChannelSocketChannelDatagramChannelFileChannel總結FYI

Channel類繼承結構圖

  1. 從上圖中我們可以看出前面講述的四個類都是被定義為抽象的,這些類中隻是聲明了可操作的接口;主要是在不同的作業系統當中,其實際操作本地I/O和網絡I/O在實作上會有根本性的差異,就拿Windows和Unix來說,兩者的檔案系統管理是不一緻的(想了解三者I/O架構上的差別可參考Unix,Linux,Windows的IO架構)
  2. Channel接口實作了Closeable接口,并且本身還定義isOpen()方法,辨別所有的Channel都是可以被主動關閉
  3. InterruptibleChannel接口聲明了Channel是可以被中斷的
  4. SelectableChannel接口聲明了Channel是可以被選擇的(即支援Selector控制),而FileChannel是沒有實作該接口的
  5. WritableByteChannel和ReadableByteChannel接口分别提供了寫操作和讀操作的API,且是基于Buffer的
  6. ScatteringByteChannel和GatheringByteChannel接口允許您委托作業系統來完成辛苦活:将讀取到的資料分開存放到多個存儲桶(bucket)或者将不同的資料區塊合并成一個整體。這是一個巨大的成就,因為作業系統已經被高度優化來完成此類工作了。它節省了您來回移動資料的工作,也就避免了緩沖區拷貝和減少了您需要編寫、調試的代碼數量。其分别定義了write(ByteBuffer[] srcs, int offset, int length)和read(ByteBuffer[] dsts, int offset, int length)
  7. SeekableByteChannel接口用于控制本地檔案的position
  8. NetworkChannel接口辨別了該Channel是屬于網絡I/O

ServerSocketChannel

讓我們從最簡單的ServerSocketChannel來開始對socket通道類的讨論。以下是ServerSocketChannel的完整 API:

public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open() throws IOException public abstract ServerSocket socket(); public abstract SocketChannel accept() throws IOException; //支援的SelectionKey類型,傳回OP_ACCEPT public final int validOps()}
           

ServerSocketChannel與ServerSocket一樣是socket監聽器,其主要差別前者可以運作在非阻塞模式下運作;

// 建立一個ServerSocketChannel,将會關聯一個未綁定的ServerSocket public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
           

ServerSocketChannel的建立也是依賴底層作業系統實作,其實作類主要是ServerSocketChannelImpl,我們來看看其構造方法

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl { private final FileDescriptor fd; private int fdVal; // 這裡忽略一些變量 ..... private int state = -1; ServerSocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); // 建立一個檔案操作符 this.fd = Net.serverSocket(true); // 得到檔案操作符是索引 this.fdVal = IOUtil.fdVal(this.fd); this.state = 0; }
           

建立一個ServerSocketChannelImpl其本質是在底層作業系統建立了一個fd(即檔案描述符),相當于建立了一個用于網絡通信的通道,通過這個通道我們可以和外部網絡進行通信;

當然上述操作隻是搶占了一個通道,它是無法和外部通信的;我們知道,在實際網絡互動中,必須通過端口才能通信,是以呢,下一步我們來看看如何綁定端口

ServerSocketChannel貌似沒有bind()方法來綁定端口,上面我們提到它在建立時會建立一個fd,其本質對應了ServetSocket對象,我們看ServerSocketChannel的API能看到通過socket()對象能擷取到ServetSocket,此時我們隻要調用socket的bind()方法綁定即可

ServerSocketChannel#socket#bind(InetSocketAddress)
           

ServerSocketChannel最主要的作用就是用于監聽TCP連接配接,其API中也有相應的accept()方法來擷取TCP連接配接

public SocketChannel accept() throws IOException { // 忽略一些校驗及無關代碼 ....  SocketChannelImpl var2 = null; // var3的作用主要是說明目前的IO狀态,主要有 /** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */ int var3 = 0; // 這裡本質也是用fd來擷取連接配接 FileDescriptor var4 = new FileDescriptor(); // 用來存儲TCP連接配接的位址資訊 InetSocketAddress[] var5 = new InetSocketAddress[1];  try { // 這裡設定了一個中斷器,中斷時會将連接配接關閉 this.begin(); // 這裡當IO被中斷時,會重新擷取連接配接 do { var3 = this.accept(this.fd, var4, var5); } while(var3 == -3 && this.isOpen()); }finally { // 當連接配接被關閉且accept失敗時或抛出AsynchronousCloseException this.end(var3 > 0); // 驗證連接配接是可用的 assert IOStatus.check(var3); }  if (var3 < 1) { return null; } { // 預設連接配接是阻塞的 IOUtil.configureBlocking(var4, true); // 建立一個SocketChannel的引用 var2 = new SocketChannelImpl(this.provider(), var4, var5[0]); // 下面是是否連接配接成功校驗,這裡忽略...  return var2; }}// 依賴底層作業系統實作的accept0方法private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException { return this.accept0(var1, var2, var3);}
           

SocketChannel

使用ServerSocketChannel可以實時擷取到建立的TCP連接配接,從上面accpet()方法得出,其傳回的是一個SocketChannelImpl對象,其繼承的類的是SocketChannel,以下是SocketChannel的API:

public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // 這裡僅列出部分API public static SocketChannel open() throws IOException public static SocketChannel open(InetSocketAddress remote) throws IOException public abstract Socket socket(); public abstract boolean connect (SocketAddress remote) throws IOException; // 目前的連接配接channel是否有并發連接配接,非阻塞狀态下才有可能傳回true public abstract boolean isConnectionPending(); //調用finishConnect()方法來完成連接配接過程,該方法任何時候都可以安全地進行調用。假如在一個非阻塞模式的SocketChannel對象上調用finishConnect()方法,将可能出現下列情形之一: /** * 1.connect()方法尚未被調用。那麼将産生NoConnectionPendingException異常。 * 2.連接配接建立過程正在進行,尚未完成。那麼什麼都不會發生,finishConnect()方法會立即傳回false值。 * 3.在非阻塞模式下調用connect()方法之後,SocketChannel又被切換回了阻塞模式。那麼如果有必要的話,調用線程會阻塞直到連接配接建立完成,finishConnect()方法接着就會傳回true值。 * 4.在初次調用connect()或最後一次調用finishConnect()之後,連接配接建立過程已經完成。那麼SocketChannel對象的内部狀态将被更新到已連接配接狀态,finishConnect()方法會傳回true值,然後SocketChannel對象就可以被用來傳輸資料了。 * 5.連接配接已經建立。那麼什麼都不會發生,finishConnect()方法會傳回true值。 */ public abstract boolean finishConnect() throws IOException; // 是否連接配接成功 public abstract boolean isConnected();  // 支援的SelectionKey類型,傳回OP_CONNECT,OP_READ,OP_WRITE public final int validOps();  public abstract int read(ByteBuffer dst) throws IOException; public abstract int write(ByteBuffer src) throws IOException;}
           

上文我們提到SocketChannel是用于讀寫TCP通信的資料,與Socket類一緻,其封裝的是點對點、有序的網絡連接配接;一個SocketChannel的建立必然伴随着會建立一個同等的Socket對象(實際是SocketAdaptor),通過socket方法能擷取;

從API的方法名我們不難看出,其主要作用是

  1. 通過open方法建立SocketChannel,
  2. 然後利用connect方法來和服務端發起建立連接配接,還支援了一些判斷連接配接建立情況的方法;
  3. read和write支援最基本的讀寫操作

下面跟随着上述4點讓我們來探究下其底層是怎麼工作。

(1)open建立過程

public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
           

與ServerSocketChannel一樣,SocketChannel的建立也是依賴底層作業系統實作,其實作類主要是SocketChannelImpl,建立過程比較簡單,執行個體化了一個fd,并将目前Channel的狀态置為了未連接配接

class SocketChannelImpl extends SocketChannel implements SelChImpl{ // Our file descriptor object private final FileDescriptor fd;  // fd value needed for dev/poll. This value will remain valid // even after the value in the file descriptor object has been set to -1 private final int fdVal;  // Lock held by current reading or connecting thread private final Object readLock = new Object(); // Lock held by current writing or connecting thread private final Object writeLock = new Object(); // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! private final Object stateLock = new Object(); // State, increases monotonically private static final int ST_UNINITIALIZED = -1; private static final int ST_UNCONNECTED = 0; private static final int ST_PENDING = 1; private static final int ST_CONNECTED = 2; private static final int ST_KILLPENDING = 3; private static final int ST_KILLED = 4; private int state = ST_UNINITIALIZED;  SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); // 建立一個scoket通道,即fd(fd的作用可參考上面的描述) this.fd = Net.socket(true); // 得到該fd的索引 this.fdVal = IOUtil.fdVal(fd); this.state = ST_UNCONNECTED; }}
           

(2)connect建立連接配接

// Channel的連接配接過程,這隻附了關鍵部分代碼public boolean connect(SocketAddress sa) throws IOException { // 讀寫都鎖住 lock(readLock&writeLock) { /****狀态檢查,channel和address****/ // 判斷channel是否 ensureOpenAndUnconnected(); InetSocketAddress isa = Net.checkAddress(sa); /****連接配接建立****/ // 阻塞狀态變更的鎖也鎖住 lock(blockingLock) {  lock(stateLock) { // 如果目前socket未綁定本地端口,則嘗試着判斷和服務端是否能建立連接配接 if (localAddress == null) { // 和遠端建立連接配接後關閉連接配接,待會我們詳細說一下這個方法 NetHooks.beforeTcpConnect(fd,isa.getAddress(),isa.getPort()); } }  for(;;) { // 建立連接配接 n = Net.connect(fd , ia , isa.getPort()); // 中斷會重新嘗試 if((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } break; } } /****狀态變更****/ lock(stateLock) { if(n > 0) { state = ST_CONNECTED; return; }  // 如果連接配接尚未建立成功,且目前channel是非阻塞的,狀态置為pending,此時不允許其他調用,調用時會抛ConnectionPendingException if (!isBlocking())  state = ST_PENDING; } } }// 這裡補充介紹一下NetHooks.beforeTcpConnect方法,這個方法在其他地方也可能遇到/**********其調用鍊路如下**********/// 1. NetHooks#beforeTcpConnect// 直接被代理跳轉到SdpProviderprovider.implBeforeTcpConnect(fdObj, address, port);// 2. SdpProvider#implBeforeTcpConnect// 主要是先通過規則校驗器判斷入參是否符合,一般有PortRangeRule校驗器// 然後再執行将fd轉換為socketfor (Rule rule: rules) { if (rule.match(action, address, port)) { SdpSupport.convertSocket(fdObj); matched = true; break; }}// 3. SdpSupport#convertSocket// 擷取fd的索引int fdVal = fdAccess.get(fd);// 跳轉到native方法convert0(fdVal);// 4. SdpSupport.c#convert0JNIEXPORT void JNICALLJava_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd){ // create方法實際是通過socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一個socket int s = create(env);  if (s >= 0) { socklen_t len; int arg, res; struct linger linger; /* copy socket options that are relevant to SDP */ len = sizeof(arg); // 重用TIME_WAIT的端口 if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len); len = sizeof(arg); // 緊急資料放入普通資料流 if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len); len = sizeof(linger); // 延遲關閉連接配接 if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0) setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len); // 将fd也引用到s所持有的通道 RESTARTABLE(dup2(s, fd), res); if (res < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2"); // 執行close方法,關閉s這個引用 RESTARTABLE(close(s), res); }}
           

(3)read

ok,現在Channel已經通過connect方法與伺服器連接配接好了連接配接,下面我們開始試着讀寫channel吧

讓我們回顧一下,在[Select詳解]文章中,我們寫了個簡單的Selector的例子,其中就包括了讀和寫操作,我們先看看讀操作:

.....while ((ret = socketChannel.read(buf)) > 0){ readBytes += ret;}.....
           

從具體socketChannel中讀取内容至buf,傳回的是目前IO的狀态,讓我們來探究下源碼

1.SocketChannelImpl#read(ByteBuffer buf){ lock(readLock) { // 如果buf=null,抛出NullPointerException .... // 這裡有個判斷,當channel被關閉時直接傳回0 ..... // 核心讀邏輯 for (;;) { // 通過IOUtil的讀取fd的資料至buf // 這裡的nd是SocketDispatcher,用于調用底層的read和write操作 n = IOUtil.read(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } // 這個方法主要是将UNAVAILABLE(原為-2)這個狀态傳回0,否則傳回n return IOStatus.normalize(n); } }}2.IOUtil.read(fd , buf , position , nd){ // 如果buf是隻可讀,則抛出異常 throw IllegalArgumentException("Read-only buffer"); if (dst instanceof DirectBuffer) return readIntoNativeBuffer(fd, buf, position, nd);  // 臨時緩沖區,大小為buf的remain(limit - position),堆外記憶體,使用ByteBuffer.allocateDirect(size)配置設定 // Notes:這裡配置設定後後面有個try-finally塊會釋放該部分記憶體 ByteBuffer bb = Util.getTemporaryDirectBuffer(buf.remaining()); // 将網絡中的buf讀進direct buffer int n = readIntoNativeBuffer(fd, bb, position, nd); // 待讀取 bb.flip(); if (n > 0) // 成功時寫入 buf.put(bb); return n;}3.IOUtil.readIntoNativeBuffer(fd , buf , position , nd){ // ... 忽略一些擷取buf變量的代碼  if (position != -1) { // pread方法隻有在同步狀态下才能使用 n = nd.pread(fd ,((DirectBuffer)bb).address() + pos,rem, position); } else { // 其調用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法 n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); }}4.FileDispatcherImpl.read0{ // 擷取fd索引 jint fd = fdval(env, fdo); void *buf = (void *)jlong_to_ptr(address);  // 調用底層read方法 return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);}
           

總結一下讀取的過程

  1. 初始化一個direct buffer,如果本身的buffer就是direct的則不用初始化
  2. 調用底層read方法寫入至direct buffer
  3. 最終将direct buffer寫到傳入的buffer對象

(4)write

ok,看完了讀的過程,我們在看看寫的過程;還是之前例子中的寫入代碼,如下

if (buf.hasRemaining()) { socketChannel.write(buf);}
           

繼續探究下源碼

1.SocketChannelImpl#write(ByteBuffer buf){ lock(writeLock) { //... for (;;) { // 通過IOUtil的讀取fd的資料至buf // 這裡的nd是SocketDispatcher,用于調用底層的read和write操作 n = IOUtil.write(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } // 這個方法主要是将UNAVAILABLE(原為-2)這個狀态傳回0,否則傳回n return IOStatus.normalize(n); } }}2.IOUtil.write(fd, buf, position, nd){ // .... if (src instanceof DirectBuffer) return writeFromNativeBuffer(fd, buf, position, nd);  ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);  bb.put(buf); bb.flip(); // 這裡的pos為buf初始的position,意思是将buf重置為最初的狀态;因為目前還沒有真實的寫入到channel中 buf.position(pos);  // 調用 int n = writeFromNativeBuffer(fd, bb, position, nd); if (n > 0) { buf.position(pos + n); }}3.IOUtil.writeFromNativeBuffer(fd , buf , position , nd){ // ... 忽略一些擷取buf變量的代碼  int written = 0; if (position != -1) { // pread方法隻有在同步狀态下才能使用 written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position); } else { // 其調用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法 written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem); } //....}4.FileDispatcherImpl.write0{ // 調用底層的write方法寫入 return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);}
           

總結一下write的過程:

  1. 如果buf是direct buffer則直接開始寫入,否則需要初始化一個direct buffer,大小是buf的remain
  2. 将buf的内容寫入到direct buffer中,并恢複buf的position
  3. 調用底層的write方法寫入至channel
  4. 更新buf的position,即被direct buffer讀取内容後的position

DatagramChannel

DatagramChannel是NIO中面向Datagram(資料報)的套接字通道.

一般我們在實際程式設計中用到這個Channel的情況很少,是以我在這裡就不詳細說明了,有興趣的同學可以通過Java NIO深入了解DatagramChannel這篇文章了解

FileChannel

FileChannel是線程安全的,隻能通過FileInputStream,FileOutputStream,RandomAccessFile的getChannel方法擷取FileChannel通道,原理是擷取到底層作業系統生成的fd(file descriptor)

FileChannel主要是對本地檔案操作的NIO中的一套新的機制,後面我們再配合IO部分的内容來仔細研究它

總結

這篇文章主要是介紹了Channel通道類在NIO程式設計中的作用,并主要講述了ServerSocketChannel和SocketChannel這兩個Channel的底層工作機制,總結一下上面的關鍵點

  1. ServerSocketChannel隻支援accept事件,SocketChannel隻支援connect、read、write事件
  2. SocketChannel的讀取和寫入都需要依賴direct buffer來做中間轉換
  3. SocketChannel在connect之前會調用NetHooks#beforeTcpConnect

FYI

  • Java NIO Channel詳解
  • Java NIO 的前生今世 之二 NIO Channel 小結
  • c語言 - socket程式設計(三)
  • C++ socket TCP開發基本流程總結

微信搜尋公衆号"一隻懶懶的coder"可關注我擷取最新動态哦!!