天天看点

网络编程之SocketChannel & ServerSocketChannel & Selector

在前面的文章中我们讲过Socket的读是阻塞模式,并且读写都是一个字节或者几个字节,为了提高响应性能往往需要在服务端为每一个请求分配一个线程。后来为了解决传统Socket的这种阻塞低效方式,在jdk1.4之后引入了New I/o(NIO)。

NIO网络编程中比较重要的概念是缓冲区、通道、选择器,NIO它是基于缓存区的操作,一次能读写一个或者多个数据块,通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。在NIO通道的读写两端能直接操作的就是ByteBuffer,缓冲区是可以在不同通道或者同一个通道的读写端共用的。与缓冲区不同,通道不能被重复使用,一个打开的通道即代表与一个特定I/O服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。

Socket和SocketChannel类封装点对点、有序的网络连接,SocketChannel扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收,每个SocketChannel对象创建时都是同一个对等的java.net.Socket对象串联的。

而在新创建的SocketChannel上调用socket( )方法能返回它对等的Socket对象;在该Socket上调用getChannel( )方法则能返回最初的那个SocketChannel。

如果选择使用通过在对等Socket对象上调用connect( )方法与服务端建立连接,那么线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么使用传统Socket连接过程实际上是一样的。在SocketChannel上并没有一种connect( )方法可以让您指定超时(timeout)值,当connect( )方法在非阻塞模式下被调用时SocketChannel提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立,connect( )方法会返回false且并发地继续连接建立过程。

而在服务端ServerSocketChannel扮演者服务端通道的角色,它负责监听服务器上的一个连接,在创建服务端通道的时候需要调用对等的ServerSocket对象绑定到指定的端口上。在传统的基于流的Socket网络编程中,服务端为每一个请求创建一个线程用于读写数据,而使用ServerSocketChannel在服务端编程,我们往往配合Selector选择器使用,在服务端我们将特定的Accpet、Read、Write事件注册到选择器上,由选择器帮我检查操作系统内核是否可读和可写,如有对应的事件满足要求,选择器会通知调用者线程。

相对于传统Socket编程,使用基于缓冲区的NIO编程在如下几点不会阻塞调用者线程:

(1)客户端connect( )方法不会阻塞

(2)服务端accept()方法不会阻塞

(3)Socket的读read()方法不会阻塞

下面我展示一个常用的NIO网络编程的客户端和服务端的代码示例,代码是经过自己测试可用的,其中我添加了大量的注释用于说明代码的意图:

/**
 * @author yujie.wang
 *	SocketChannel 客户端代码测试
 */
public class SocketChannel_Client {
	
	private final static String DEFAULT_HOST = "127.0.0.1";
	
	private final static int DEFAULT_PORT = 4567;
	
	private SocketChannel channel;
	
	private Socket socket;
	
	//分配一个大小为50字节的缓冲区 用于客户端通道的读写
	private ByteBuffer buffer = ByteBuffer.allocate(50);
	
	public SocketChannel_Client(){
		this(DEFAULT_HOST, DEFAULT_PORT);
	}
	
	public SocketChannel_Client(String host, int port){
		init(host,port);
	}
	
	/**
	 * 打开通道并设置对等的客户端socket对象
	 * 建立与服务端通道的连接
	 * @param host
	 * @param port
	 */
	public void init(String host, int port){
		try {
			//打开一个客户端通道,同时当前通道并没有与服务端通道建立连接
			channel = SocketChannel.open();
			//获得对等的客户端socket
			socket = channel.socket();
			//配置客户端socket
			setSocket();
			//将通道设置为非阻塞工作方式
			channel.configureBlocking(false);
			//异步连接,发起连接之后就立即返回
			//返回true,连接已经建立
			//返回false,后续继续建立连接
			channel.connect(new InetSocketAddress(host,port));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 验证连接是否建立
	 */
	public void finishConnect(){
		try {
			while(!channel.finishConnect()){
				// nothing to do,wait connect
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 验证当前连接是否可用
	 */
	public void isConnected(){
		try {
			if(channel == null || !channel.isConnected())
				throw new IOException("channel is broken");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 配置客户端通道对等的Socket
	 */
	public void setSocket(){
		try {
			if(socket != null ){
				//设置socket 读取的超时时间5秒
				//socket.setSoTimeout(5000);
				//设置小数据包不再组合成大包发送,也不再等待前一个数据包返回确认消息
				socket.setTcpNoDelay(true);
				//设置如果客户端Socket关闭了,未发送的包直接丢弃
				socket.setSoLinger(true, 0);
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
	
	public void write(String data) {
		buffer.clear();
		buffer.put(data.getBytes());
		buffer.flip();
		try {
			// write并不一定能一次将buffer中的数据都写入 所以这里要多次写入
			// 当多个线程同时调用同一个通道的写方法时,只有一个线程能工作,其他现在则会阻塞
			while(buffer.hasRemaining()){
				channel.write(buffer);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void read(){
		try {
			buffer.clear();
			// read方法并不阻塞,如果有数据读入返回读入的字节数,没有数据读入返回0 ,遇到流的末尾返回-1
			// 当然这里和Socket和ServerSocket通信一样 也会存在消息无边界的问题 我们这里就采取简单的读取一次作为示例
			System.out.println("read begin");
			channel.read(buffer);
		/*	while(buffer.hasRemaining() && channel.read(buffer) != -1){
				printBuffer(buffer);
			}*/
			buffer.flip();
			printBuffer(buffer);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 输出buffer中的数据
	 * @param buffer
	 */
	public void printBuffer(ByteBuffer buffer){
		while(buffer.hasRemaining()){
			System.out.print((char)buffer.get());
		}
		System.out.println("");
		System.out.println("****** Read end ******");
	}
	
	/**
	 * 判断通道是否打开
	 * @return
	 */
	public boolean isChannelOpen(){
		try {
			return channel.finishConnect() ? true : false;
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return false;
	}
	
	/**
	 * 关闭通道
	 */
	public void closeChannel(){
		if(channel != null){
			try {
				channel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
	//	client(DEFAULT_HOST,DEFAULT_PORT);
		SocketChannel_Client client = new SocketChannel_Client();
		client.finishConnect();
		System.out.println("connect success");
		client.write("Hello World");
		System.out.println("client write end");
		client.read();
		sleep(15000);
		System.out.println("client exit");
 
	}
	
	public static void sleep(long time){
		try {
			Thread.sleep(time);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
           

服务端代码示例:

/**
 * @author yujie.wang
 * ServerSocketChannel 测试用例
 */
public class ServerSocketChannel_Server {

	private final static int DEFAULT_PORT = 4567;
	
	private ServerSocketChannel channel;
	
	private Selector selector;
	
	private ServerSocket serverSocket;
	
	public ServerSocketChannel_Server(){
		this(DEFAULT_PORT);
	}
	
	public ServerSocketChannel_Server(int port){
		init(port);
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		ServerSocketChannel_Server server = new ServerSocketChannel_Server();
		server.selector();
		System.out.println("server exit");
	}
	
	public void init(int port){
		try {
			//打开一个服务端通道
			channel = ServerSocketChannel.open();
			//获得对等的ServerSocket对象
			serverSocket = channel.socket();
			//将服务端ServerSocket绑定到指定端口
			serverSocket.bind(new InetSocketAddress(port));
			System.out.println("Server listening on port: "+ port);
			//将通道设置为非阻塞模式
			channel.configureBlocking(false);
			//打开一个选择器
			selector = Selector.open();
			//将通道注册到打开的选择器上
			channel.register(selector, SelectionKey.OP_ACCEPT);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void selector(){
		try {
			while(true){
				System.out.println("begin to select");
				//select()方法会阻塞,直到有准备就绪的通道有准备好的操作;或者当前线程中断该方法也会返回
				//这里的返回值不是选择器中已选择键集合中键的数量,而是从上一次select()方法调用到这次调用期间进入就绪状态通道的数量
				int readyKeyCount = selector.select();
				if(readyKeyCount <= 0){
					continue;
				}
				System.out.println("ok select readyCount: "+ readyKeyCount);
				//获得已选择键的集合这个集合中包含了 新准备就绪的通道和上次调用select()方法已经存在的就绪通道
				Set<SelectionKey> set = selector.selectedKeys();
				Iterator<SelectionKey> iterator = set.iterator();
				while(iterator.hasNext()){
					SelectionKey key = iterator.next();
					//通过调用remove将这个键key从已选择键的集合中删除
					iterator.remove();
					if(key.isAcceptable()){
						handleAccept(key);
					}else if(key.isReadable()){
						handleRead(key);
					}else if(key.isWritable()){
						handleWrite(key,"Hello World");
					}
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 处理客户端连接事件
	 * @param key
	 * @param selector
	 */
	public void handleAccept(SelectionKey key){
		try {
			//因为能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道,
			//所以这里可以直接转换成ServerSocketChannel
			ServerSocketChannel channel = (ServerSocketChannel)key.channel();
			//获得客户端的SocketChannel对象 ,accept这里不会阻塞,如果没有连接到来,这里会返回null
			SocketChannel client = channel.accept();
			System.out.println("Accepted Connected from: "+ client);
			//将客户端socketChannel设置为非阻塞模式
			client.configureBlocking(false);
			//为该客户端socket分配一个ByteBuffer
			ByteBuffer buffer = ByteBuffer.allocate(50);
			client.register(selector, SelectionKey.OP_READ, buffer);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 处理读取数据事件
	 * @param key
	 */
	public void handleRead(SelectionKey key){
		try {
			//从键中获得相应的客户端socketChannel
			SocketChannel channel = (SocketChannel)key.channel();
			//获得与客户端socketChannel关联的buffer
			ByteBuffer buffer = (ByteBuffer)key.attachment();
			buffer.clear();
			//将数据读取到buffer中,这里read方法不会阻塞
			//有数据返回读取的字节数,没有数据返回0,遇到流的末尾则返回-1
			//这里为了避免消息的无边界 性 ,所以只读取一次数据
			int count = channel.read(buffer);
			System.out.println("read count:"+ count);
			buffer.flip();
			//输出数据
			printBuffer(buffer);
			buffer.clear();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 处理写入数据的时间
	 * @param key
	 * @param data
	 */
	public void handleWrite(SelectionKey key,String data){
		try {
			SocketChannel channel = (SocketChannel)key.channel();
			ByteBuffer buffer = (ByteBuffer)key.attachment();
			buffer.clear();
			buffer.put(data.getBytes());
			buffer.flip();
			while(buffer.hasRemaining()){
				channel.write(buffer);
			}
			buffer.clear();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

	public static void printBuffer(ByteBuffer buffer){
		while(buffer.hasRemaining()){
			System.out.println("positon: "  + buffer.position()+ " limit:"+ buffer.limit());
			System.out.print((char)buffer.get());
		}
		System.out.println("");
		System.out.println("****** Read end ******");
		System.out.println("");
	}
	
}
           

继续阅读