在前面的文章中我们讲过Socket的读是阻塞模式,并且读写都是一个字节或者几个字节,为了提高响应性能往往需要在服务端为每一个请求分配一个线程。后来为了解决传统Socket的这种阻塞低效方式,在jdk1.4之后引入了New I/o(NIO)。
NIO网络编程中比较重要的概念是缓冲区、通道、选择器,NIO它是基于缓存区的操作,一次能读写一个或者多个数据块,通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。在NIO通道的读写两端能直接操作的就是ByteBuffer,缓冲区是可以在不同通道或者同一个通道的读写端共用的。与缓冲区不同,通道不能被重复使用,一个打开的通道即代表与一个特定I/O服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。
Socket和SocketChannel类封装点对点、有序的网络连接,SocketChannel扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收,每个SocketChannel对象创建时都是同一个对等的java.net.Socket对象串联的。
而在新创建的SocketChannel上调用socket( )方法能返回它对等的Socket对象;在该Socket上调用getChannel( )方法则能返回最初的那个SocketChannel。
如果选择使用通过在对等Socket对象上调用connect( )方法与服务端建立连接,那么线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么使用传统Socket连接过程实际上是一样的。在SocketChannel上并没有一种connect( )方法可以让您指定超时(timeout)值,当connect( )方法在非阻塞模式下被调用时SocketChannel提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立,connect( )方法会返回false且并发地继续连接建立过程。
而在服务端ServerSocketChannel扮演者服务端通道的角色,它负责监听服务器上的一个连接,在创建服务端通道的时候需要调用对等的ServerSocket对象绑定到指定的端口上。在传统的基于流的Socket网络编程中,服务端为每一个请求创建一个线程用于读写数据,而使用ServerSocketChannel在服务端编程,我们往往配合Selector选择器使用,在服务端我们将特定的Accpet、Read、Write事件注册到选择器上,由选择器帮我检查操作系统内核是否可读和可写,如有对应的事件满足要求,选择器会通知调用者线程。
相对于传统Socket编程,使用基于缓冲区的NIO编程在如下几点不会阻塞调用者线程:
(1)客户端connect( )方法不会阻塞
(2)服务端accept()方法不会阻塞
(3)Socket的读read()方法不会阻塞
下面我展示一个常用的NIO网络编程的客户端和服务端的代码示例,代码是经过自己测试可用的,其中我添加了大量的注释用于说明代码的意图:
/**
* @author yujie.wang
* SocketChannel 客户端代码测试
*/
public class SocketChannel_Client {
private final static String DEFAULT_HOST = "127.0.0.1";
private final static int DEFAULT_PORT = 4567;
private SocketChannel channel;
private Socket socket;
//分配一个大小为50字节的缓冲区 用于客户端通道的读写
private ByteBuffer buffer = ByteBuffer.allocate(50);
public SocketChannel_Client(){
this(DEFAULT_HOST, DEFAULT_PORT);
}
public SocketChannel_Client(String host, int port){
init(host,port);
}
/**
* 打开通道并设置对等的客户端socket对象
* 建立与服务端通道的连接
* @param host
* @param port
*/
public void init(String host, int port){
try {
//打开一个客户端通道,同时当前通道并没有与服务端通道建立连接
channel = SocketChannel.open();
//获得对等的客户端socket
socket = channel.socket();
//配置客户端socket
setSocket();
//将通道设置为非阻塞工作方式
channel.configureBlocking(false);
//异步连接,发起连接之后就立即返回
//返回true,连接已经建立
//返回false,后续继续建立连接
channel.connect(new InetSocketAddress(host,port));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 验证连接是否建立
*/
public void finishConnect(){
try {
while(!channel.finishConnect()){
// nothing to do,wait connect
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 验证当前连接是否可用
*/
public void isConnected(){
try {
if(channel == null || !channel.isConnected())
throw new IOException("channel is broken");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 配置客户端通道对等的Socket
*/
public void setSocket(){
try {
if(socket != null ){
//设置socket 读取的超时时间5秒
//socket.setSoTimeout(5000);
//设置小数据包不再组合成大包发送,也不再等待前一个数据包返回确认消息
socket.setTcpNoDelay(true);
//设置如果客户端Socket关闭了,未发送的包直接丢弃
socket.setSoLinger(true, 0);
}
} catch (Exception e) {
// TODO: handle exception
}
}
public void write(String data) {
buffer.clear();
buffer.put(data.getBytes());
buffer.flip();
try {
// write并不一定能一次将buffer中的数据都写入 所以这里要多次写入
// 当多个线程同时调用同一个通道的写方法时,只有一个线程能工作,其他现在则会阻塞
while(buffer.hasRemaining()){
channel.write(buffer);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void read(){
try {
buffer.clear();
// read方法并不阻塞,如果有数据读入返回读入的字节数,没有数据读入返回0 ,遇到流的末尾返回-1
// 当然这里和Socket和ServerSocket通信一样 也会存在消息无边界的问题 我们这里就采取简单的读取一次作为示例
System.out.println("read begin");
channel.read(buffer);
/* while(buffer.hasRemaining() && channel.read(buffer) != -1){
printBuffer(buffer);
}*/
buffer.flip();
printBuffer(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 输出buffer中的数据
* @param buffer
*/
public void printBuffer(ByteBuffer buffer){
while(buffer.hasRemaining()){
System.out.print((char)buffer.get());
}
System.out.println("");
System.out.println("****** Read end ******");
}
/**
* 判断通道是否打开
* @return
*/
public boolean isChannelOpen(){
try {
return channel.finishConnect() ? true : false;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
/**
* 关闭通道
*/
public void closeChannel(){
if(channel != null){
try {
channel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
// client(DEFAULT_HOST,DEFAULT_PORT);
SocketChannel_Client client = new SocketChannel_Client();
client.finishConnect();
System.out.println("connect success");
client.write("Hello World");
System.out.println("client write end");
client.read();
sleep(15000);
System.out.println("client exit");
}
public static void sleep(long time){
try {
Thread.sleep(time);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
服务端代码示例:
/**
* @author yujie.wang
* ServerSocketChannel 测试用例
*/
public class ServerSocketChannel_Server {
private final static int DEFAULT_PORT = 4567;
private ServerSocketChannel channel;
private Selector selector;
private ServerSocket serverSocket;
public ServerSocketChannel_Server(){
this(DEFAULT_PORT);
}
public ServerSocketChannel_Server(int port){
init(port);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
ServerSocketChannel_Server server = new ServerSocketChannel_Server();
server.selector();
System.out.println("server exit");
}
public void init(int port){
try {
//打开一个服务端通道
channel = ServerSocketChannel.open();
//获得对等的ServerSocket对象
serverSocket = channel.socket();
//将服务端ServerSocket绑定到指定端口
serverSocket.bind(new InetSocketAddress(port));
System.out.println("Server listening on port: "+ port);
//将通道设置为非阻塞模式
channel.configureBlocking(false);
//打开一个选择器
selector = Selector.open();
//将通道注册到打开的选择器上
channel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void selector(){
try {
while(true){
System.out.println("begin to select");
//select()方法会阻塞,直到有准备就绪的通道有准备好的操作;或者当前线程中断该方法也会返回
//这里的返回值不是选择器中已选择键集合中键的数量,而是从上一次select()方法调用到这次调用期间进入就绪状态通道的数量
int readyKeyCount = selector.select();
if(readyKeyCount <= 0){
continue;
}
System.out.println("ok select readyCount: "+ readyKeyCount);
//获得已选择键的集合这个集合中包含了 新准备就绪的通道和上次调用select()方法已经存在的就绪通道
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
//通过调用remove将这个键key从已选择键的集合中删除
iterator.remove();
if(key.isAcceptable()){
handleAccept(key);
}else if(key.isReadable()){
handleRead(key);
}else if(key.isWritable()){
handleWrite(key,"Hello World");
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 处理客户端连接事件
* @param key
* @param selector
*/
public void handleAccept(SelectionKey key){
try {
//因为能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道,
//所以这里可以直接转换成ServerSocketChannel
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
//获得客户端的SocketChannel对象 ,accept这里不会阻塞,如果没有连接到来,这里会返回null
SocketChannel client = channel.accept();
System.out.println("Accepted Connected from: "+ client);
//将客户端socketChannel设置为非阻塞模式
client.configureBlocking(false);
//为该客户端socket分配一个ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(50);
client.register(selector, SelectionKey.OP_READ, buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 处理读取数据事件
* @param key
*/
public void handleRead(SelectionKey key){
try {
//从键中获得相应的客户端socketChannel
SocketChannel channel = (SocketChannel)key.channel();
//获得与客户端socketChannel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
buffer.clear();
//将数据读取到buffer中,这里read方法不会阻塞
//有数据返回读取的字节数,没有数据返回0,遇到流的末尾则返回-1
//这里为了避免消息的无边界 性 ,所以只读取一次数据
int count = channel.read(buffer);
System.out.println("read count:"+ count);
buffer.flip();
//输出数据
printBuffer(buffer);
buffer.clear();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 处理写入数据的时间
* @param key
* @param data
*/
public void handleWrite(SelectionKey key,String data){
try {
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
buffer.clear();
buffer.put(data.getBytes());
buffer.flip();
while(buffer.hasRemaining()){
channel.write(buffer);
}
buffer.clear();
} catch (Exception e) {
// TODO: handle exception
}
}
public static void printBuffer(ByteBuffer buffer){
while(buffer.hasRemaining()){
System.out.println("positon: " + buffer.position()+ " limit:"+ buffer.limit());
System.out.print((char)buffer.get());
}
System.out.println("");
System.out.println("****** Read end ******");
System.out.println("");
}
}