天天看點

網絡程式設計之SocketChannel & ServerSocketChannel & Selector

在前面的文章中我們講過Socket的讀是阻塞模式,并且讀寫都是一個位元組或者幾個位元組,為了提高響應性能往往需要在服務端為每一個請求配置設定一個線程。後來為了解決傳統Socket的這種阻塞低效方式,在jdk1.4之後引入了New I/o(NIO)。

NIO網絡程式設計中比較重要的概念是緩沖區、通道、選擇器,NIO它是基于緩存區的操作,一次能讀寫一個或者多個資料塊,通道可以以阻塞(blocking)或非阻塞(nonblocking)模式運作。非阻塞模式的通道永遠不會讓調用的線程休眠。請求的操作要麼立即完成,要麼傳回一個結果表明未進行任何操作。在NIO通道的讀寫兩端能直接操作的就是ByteBuffer,緩沖區是可以在不同通道或者同一個通道的讀寫端共用的。與緩沖區不同,通道不能被重複使用,一個打開的通道即代表與一個特定I/O服務的特定連接配接并封裝該連接配接的狀态。當通道關閉時,那個連接配接會丢失,然後通道将不再連接配接任何東西。

Socket和SocketChannel類封裝點對點、有序的網絡連接配接,SocketChannel扮演用戶端發起同一個監聽伺服器的連接配接。直到連接配接成功,它才能收到資料并且隻會從連接配接到的位址接收,每個SocketChannel對象建立時都是同一個對等的java.net.Socket對象串聯的。

而在新建立的SocketChannel上調用socket( )方法能傳回它對等的Socket對象;在該Socket上調用getChannel( )方法則能傳回最初的那個SocketChannel。

如果選擇使用通過在對等Socket對象上調用connect( )方法與服務端建立連接配接,那麼線程在連接配接建立好或逾時過期之前都将保持阻塞。如果您選擇通過在通道上直接調用connect( )方法來建立連接配接并且通道處于阻塞模式(預設模式),那麼使用傳統Socket連接配接過程實際上是一樣的。在SocketChannel上并沒有一種connect( )方法可以讓您指定逾時(timeout)值,當connect( )方法在非阻塞模式下被調用時SocketChannel提供并發連接配接:它發起對請求位址的連接配接并且立即傳回值。如果傳回值是true,說明連接配接立即建立了(這可能是本地環回連接配接);如果連接配接不能立即建立,connect( )方法會傳回false且并發地繼續連接配接建立過程。

而在服務端ServerSocketChannel扮演者服務端通道的角色,它負責監聽伺服器上的一個連接配接,在建立服務端通道的時候需要調用對等的ServerSocket對象綁定到指定的端口上。在傳統的基于流的Socket網絡程式設計中,服務端為每一個請求建立一個線程用于讀寫資料,而使用ServerSocketChannel在服務端程式設計,我們往往配合Selector選擇器使用,在服務端我們将特定的Accpet、Read、Write事件注冊到選擇器上,由選擇器幫我檢查作業系統核心是否可讀和可寫,如有對應的事件滿足要求,選擇器會通知調用者線程。

相對于傳統Socket程式設計,使用基于緩沖區的NIO程式設計在如下幾點不會阻塞調用者線程:

(1)用戶端connect( )方法不會阻塞

(2)服務端accept()方法不會阻塞

(3)Socket的讀read()方法不會阻塞

下面我展示一個常用的NIO網絡程式設計的用戶端和服務端的代碼示例,代碼是經過自己測試可用的,其中我添加了大量的注釋用于說明代碼的意圖:

/**
 * @author yujie.wang
 *	SocketChannel 用戶端代碼測試
 */
public class SocketChannel_Client {
	
	private final static String DEFAULT_HOST = "127.0.0.1";
	
	private final static int DEFAULT_PORT = 4567;
	
	private SocketChannel channel;
	
	private Socket socket;
	
	//配置設定一個大小為50位元組的緩沖區 用于用戶端通道的讀寫
	private ByteBuffer buffer = ByteBuffer.allocate(50);
	
	public SocketChannel_Client(){
		this(DEFAULT_HOST, DEFAULT_PORT);
	}
	
	public SocketChannel_Client(String host, int port){
		init(host,port);
	}
	
	/**
	 * 打開通道并設定對等的用戶端socket對象
	 * 建立與服務端通道的連接配接
	 * @param host
	 * @param port
	 */
	public void init(String host, int port){
		try {
			//打開一個用戶端通道,同時目前通道并沒有與服務端通道建立連接配接
			channel = SocketChannel.open();
			//獲得對等的用戶端socket
			socket = channel.socket();
			//配置用戶端socket
			setSocket();
			//将通道設定為非阻塞工作方式
			channel.configureBlocking(false);
			//異步連接配接,發起連接配接之後就立即傳回
			//傳回true,連接配接已經建立
			//傳回false,後續繼續建立連接配接
			channel.connect(new InetSocketAddress(host,port));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 驗證連接配接是否建立
	 */
	public void finishConnect(){
		try {
			while(!channel.finishConnect()){
				// nothing to do,wait connect
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 驗證目前連接配接是否可用
	 */
	public void isConnected(){
		try {
			if(channel == null || !channel.isConnected())
				throw new IOException("channel is broken");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 配置用戶端通道對等的Socket
	 */
	public void setSocket(){
		try {
			if(socket != null ){
				//設定socket 讀取的逾時時間5秒
				//socket.setSoTimeout(5000);
				//設定小資料包不再組合成大包發送,也不再等待前一個資料包傳回确認消息
				socket.setTcpNoDelay(true);
				//設定如果用戶端Socket關閉了,未發送的包直接丢棄
				socket.setSoLinger(true, 0);
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
	
	public void write(String data) {
		buffer.clear();
		buffer.put(data.getBytes());
		buffer.flip();
		try {
			// write并不一定能一次将buffer中的資料都寫入 是以這裡要多次寫入
			// 當多個線程同時調用同一個通道的寫方法時,隻有一個線程能工作,其他現在則會阻塞
			while(buffer.hasRemaining()){
				channel.write(buffer);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void read(){
		try {
			buffer.clear();
			// read方法并不阻塞,如果有資料讀入傳回讀入的位元組數,沒有資料讀入傳回0 ,遇到流的末尾傳回-1
			// 當然這裡和Socket和ServerSocket通信一樣 也會存在消息無邊界的問題 我們這裡就采取簡單的讀取一次作為示例
			System.out.println("read begin");
			channel.read(buffer);
		/*	while(buffer.hasRemaining() && channel.read(buffer) != -1){
				printBuffer(buffer);
			}*/
			buffer.flip();
			printBuffer(buffer);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 輸出buffer中的資料
	 * @param buffer
	 */
	public void printBuffer(ByteBuffer buffer){
		while(buffer.hasRemaining()){
			System.out.print((char)buffer.get());
		}
		System.out.println("");
		System.out.println("****** Read end ******");
	}
	
	/**
	 * 判斷通道是否打開
	 * @return
	 */
	public boolean isChannelOpen(){
		try {
			return channel.finishConnect() ? true : false;
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return false;
	}
	
	/**
	 * 關閉通道
	 */
	public void closeChannel(){
		if(channel != null){
			try {
				channel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
	//	client(DEFAULT_HOST,DEFAULT_PORT);
		SocketChannel_Client client = new SocketChannel_Client();
		client.finishConnect();
		System.out.println("connect success");
		client.write("Hello World");
		System.out.println("client write end");
		client.read();
		sleep(15000);
		System.out.println("client exit");
 
	}
	
	public static void sleep(long time){
		try {
			Thread.sleep(time);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
           

服務端代碼示例:

/**
 * @author yujie.wang
 * ServerSocketChannel 測試用例
 */
public class ServerSocketChannel_Server {

	private final static int DEFAULT_PORT = 4567;
	
	private ServerSocketChannel channel;
	
	private Selector selector;
	
	private ServerSocket serverSocket;
	
	public ServerSocketChannel_Server(){
		this(DEFAULT_PORT);
	}
	
	public ServerSocketChannel_Server(int port){
		init(port);
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		ServerSocketChannel_Server server = new ServerSocketChannel_Server();
		server.selector();
		System.out.println("server exit");
	}
	
	public void init(int port){
		try {
			//打開一個服務端通道
			channel = ServerSocketChannel.open();
			//獲得對等的ServerSocket對象
			serverSocket = channel.socket();
			//将服務端ServerSocket綁定到指定端口
			serverSocket.bind(new InetSocketAddress(port));
			System.out.println("Server listening on port: "+ port);
			//将通道設定為非阻塞模式
			channel.configureBlocking(false);
			//打開一個選擇器
			selector = Selector.open();
			//将通道注冊到打開的選擇器上
			channel.register(selector, SelectionKey.OP_ACCEPT);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void selector(){
		try {
			while(true){
				System.out.println("begin to select");
				//select()方法會阻塞,直到有準備就緒的通道有準備好的操作;或者目前線程中斷該方法也會傳回
				//這裡的傳回值不是選擇器中已選擇鍵集合中鍵的數量,而是從上一次select()方法調用到這次調用期間進入就緒狀态通道的數量
				int readyKeyCount = selector.select();
				if(readyKeyCount <= 0){
					continue;
				}
				System.out.println("ok select readyCount: "+ readyKeyCount);
				//獲得已選擇鍵的集合這個集合中包含了 新準備就緒的通道和上次調用select()方法已經存在的就緒通道
				Set<SelectionKey> set = selector.selectedKeys();
				Iterator<SelectionKey> iterator = set.iterator();
				while(iterator.hasNext()){
					SelectionKey key = iterator.next();
					//通過調用remove将這個鍵key從已選擇鍵的集合中删除
					iterator.remove();
					if(key.isAcceptable()){
						handleAccept(key);
					}else if(key.isReadable()){
						handleRead(key);
					}else if(key.isWritable()){
						handleWrite(key,"Hello World");
					}
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 處理用戶端連接配接事件
	 * @param key
	 * @param selector
	 */
	public void handleAccept(SelectionKey key){
		try {
			//因為能注冊SelectionKey.OP_ACCEPT事件的隻有 ServerSocketChannel通道,
			//是以這裡可以直接轉換成ServerSocketChannel
			ServerSocketChannel channel = (ServerSocketChannel)key.channel();
			//獲得用戶端的SocketChannel對象 ,accept這裡不會阻塞,如果沒有連接配接到來,這裡會傳回null
			SocketChannel client = channel.accept();
			System.out.println("Accepted Connected from: "+ client);
			//将用戶端socketChannel設定為非阻塞模式
			client.configureBlocking(false);
			//為該用戶端socket配置設定一個ByteBuffer
			ByteBuffer buffer = ByteBuffer.allocate(50);
			client.register(selector, SelectionKey.OP_READ, buffer);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 處理讀取資料事件
	 * @param key
	 */
	public void handleRead(SelectionKey key){
		try {
			//從鍵中獲得相應的用戶端socketChannel
			SocketChannel channel = (SocketChannel)key.channel();
			//獲得與用戶端socketChannel關聯的buffer
			ByteBuffer buffer = (ByteBuffer)key.attachment();
			buffer.clear();
			//将資料讀取到buffer中,這裡read方法不會阻塞
			//有資料傳回讀取的位元組數,沒有資料傳回0,遇到流的末尾則傳回-1
			//這裡為了避免消息的無邊界 性 ,是以隻讀取一次資料
			int count = channel.read(buffer);
			System.out.println("read count:"+ count);
			buffer.flip();
			//輸出資料
			printBuffer(buffer);
			buffer.clear();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 處理寫入資料的時間
	 * @param key
	 * @param data
	 */
	public void handleWrite(SelectionKey key,String data){
		try {
			SocketChannel channel = (SocketChannel)key.channel();
			ByteBuffer buffer = (ByteBuffer)key.attachment();
			buffer.clear();
			buffer.put(data.getBytes());
			buffer.flip();
			while(buffer.hasRemaining()){
				channel.write(buffer);
			}
			buffer.clear();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

	public static void printBuffer(ByteBuffer buffer){
		while(buffer.hasRemaining()){
			System.out.println("positon: "  + buffer.position()+ " limit:"+ buffer.limit());
			System.out.print((char)buffer.get());
		}
		System.out.println("");
		System.out.println("****** Read end ******");
		System.out.println("");
	}
	
}
           

繼續閱讀