天天看点

NIO socket 的简单连接池

      在最近的项目中,需要写一个socket 与 底层服务器通信的模块。在设计中,请求对象被封装 xxxRequest,消息返回被封装为 xxxResponse. 由于socket的编程开发经验少,一开始我使用了短连接的方式,每个请求建立一个socket通信,由于每个socket只进行一次读写,这大大浪费了系统资源。

      于是考虑使用长连接,系统公用一个client socket 并对send 操作进行加锁,结果在处理并发的时候,各种慢,各种等待。没有办法,考虑使用两节池,预先创建多个 client socket 放入 连接池,需要发送请求时从连接池获取一个socket,完成请求时放入连接池中。下面是一个简单的实现。

        private  static String IP=GlobalNames.industryIP;

 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);

 private static  int CONNECTION_POOL_SIZE = 10;

 private static NIOConnectionPool self = null;

 private Hashtable<Integer, SocketChannel> socketPool = null; // 连接池

 private boolean[] socketStatusArray = null; // 连接的状态(true-被占用,false-空闲)

 private static Selector selector  = null;

 private static InetSocketAddress SERVER_ADDRESS = null;

 public static synchronized void init() throws Exception {

  self = new NIOConnectionPool();

  self.socketPool = new Hashtable<Integer, SocketChannel>();

  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];

  buildConnectionPool();

 }

 public synchronized static void buildConnectionPool() throws Exception {

  if (self == null) {

   init();

  }

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   SocketChannel client = allocateSocketChannel();

   self.socketPool.put(new Integer(i), client);

   self.socketStatusArray[i] = false;

  }

 }

 public static SocketChannel getConnection() throws Exception {

  if (self == null)

   init();

  int i = 0;

  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {

   if (!self.socketStatusArray[i]) {

    self.socketStatusArray[i] = true;

    break;

   }

  }

  if (i < CONNECTION_POOL_SIZE) {

   return self.socketPool.get(new Integer(i));

  } else {

  //目前连接池无可用连接时只是简单的新建一个连接

   SocketChannel newClient = allocateSocketChannel();

   CONNECTION_POOL_SIZE++;

   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);

   return newClient;

  }

 }

 public static SocketChannel rebuildConnection(SocketChannel socket)

   throws Exception {

  if (self == null) {

   init();

  }

  SocketChannel newSocket = null;

  try {

   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

    if (self.socketPool.get(new Integer(i)) == socket) {

     newSocket = allocateSocketChannel();

     self.socketPool.put(new Integer(i), newSocket);

     self.socketStatusArray[i] = true;

    }

   }

  } catch (Exception e) {

   System.out.println("重建连接失败!");

   throw new RuntimeException(e);

  }

  return newSocket;

 }

 public static void releaseConnection(SocketChannel socket) throws Exception {

  if (self == null) {

   init();

  }

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   if (self.socketPool.get(new Integer(i)) == socket) {

    self.socketStatusArray[i] = false;

    break;

   }

  }

 }

 public synchronized static void releaseAllConnection() throws Exception {

  if (self == null)

   init();

  // 关闭所有连接

  SocketChannel socket = null;

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   socket = self.socketPool.get(new Integer(i));

   try {

    socket.close();

    self.socketStatusArray[i] = false;

   } catch (Exception e) {

    e.printStackTrace();

   }

  }

 }

 public static SocketChannel allocateSocketChannel(){

   SERVER_ADDRESS = new InetSocketAddress(  

                IP, PORT);  

  SocketChannel socketChannel = null;

     SocketChannel client = null;

     try{

     socketChannel = SocketChannel.open();  

        socketChannel.configureBlocking(false);  

        selector = Selector.open();  

        socketChannel.register(selector, SelectionKey.OP_CONNECT);  

        socketChannel.connect(SERVER_ADDRESS);

        Set<SelectionKey> selectionKeys;  

        Iterator<SelectionKey> iterator;  

        SelectionKey selectionKey;

        selector.select();  

        selectionKeys = selector.selectedKeys();  

        iterator = selectionKeys.iterator();  

        while (iterator.hasNext()) {  

            selectionKey = iterator.next();  

            if (selectionKey.isConnectable()) {  

                client = (SocketChannel) selectionKey.channel();  

                if (client.isConnectionPending()) {  

                    client.finishConnect();

                    client.register(selector, SelectionKey.OP_WRITE);  

                    break;

                }

            }

        }

 }catch(Exception e){

  e.printStackTrace();

 }

 return client;

  }

 public static Selector getSelector() {

  return selector;

 }

使用连接池进行通信:

     private static int BLOCK = 8*4096;  

     private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);  

     private static ByteBuffer protocalNum = ByteBuffer.allocate(4);

     private static ByteBuffer functionNum = ByteBuffer.allocate(4);

     private static ByteBuffer messageLen = ByteBuffer.allocate(4);

     private static ByteBuffer receivebuffer = null;

     private  SocketChannel client = null;

     private Selector selector = null;

     private boolean readable = true;

     private boolean writable = true;

     public NIOSocketBackUp() throws Exception{

      client = NIOConnectionPool.getConnection();

      selector = NIOConnectionPool.getSelector();

     }

     public String send(ServiceRequest request) throws Exception {   

         Set<SelectionKey> selectionKeys;  

         Iterator<SelectionKey> iterator;  

         SelectionKey selectionKey;  

         int count=0;  

         boolean flag = true;

         String receiveText="";  

          while (flag) {  

             selector.select();  

             //返回此选择器的已选择键集。  

             selectionKeys = selector.selectedKeys();  

             iterator = selectionKeys.iterator();  

             while (iterator.hasNext()) {  

                 selectionKey = iterator.next();  

                 if (selectionKey.isWritable() && (writable)) {  

                         sendbuffer.clear();  

                         sendbuffer.put(request.getProtocalNum());

                         sendbuffer.put(request.getFunctionNum());

                         sendbuffer.put(request.getMessageLen());

                         sendbuffer.put(request.getXmlbytes());

                         sendbuffer.flip();  

                         client.write(sendbuffer);  

                         client.register(selector, SelectionKey.OP_READ);  

                         writable = false;

                 } else if (selectionKey.isReadable() && (readable) ) {  

                     protocalNum.clear();

                     functionNum.clear();

                     messageLen.clear();

                     count=client.read(protocalNum);  

                     count=client.read(functionNum);

                     count=client.read(messageLen);

                     messageLen.rewind();

                     int length = messageLen.asIntBuffer().get(0);

                     receivebuffer = ByteBuffer.allocate(length-12);

                     receivebuffer.clear();

                     //read方式竟然不阻塞

                     int total=0;

                     while(total!=(length-12)){

                       count=client.read(receivebuffer);

                       total+=count;

                     }

                     client.register(selector, SelectionKey.OP_WRITE);  

                     receiveText = new String(receivebuffer.array(),"GBK");

                     flag = false;

                     readable = false;

                     break;

                 }

             }  

         }

      NIOConnectionPool.releaseConnection(client);

         return receiveText.trim();

     }  

继续阅读