天天看點

NIO socket 的簡單連接配接池

      在最近的項目中,需要寫一個socket 與 底層伺服器通信的子產品。在設計中,請求對象被封裝 xxxRequest,消息傳回被封裝為 xxxResponse. 由于socket的程式設計開發經驗少,一開始我使用了短連接配接的方式,每個請求建立一個socket通信,由于每個socket隻進行一次讀寫,這大大浪費了系統資源。

      于是考慮使用長連接配接,系統公用一個client socket 并對send 操作進行加鎖,結果在處理并發的時候,各種慢,各種等待。沒有辦法,考慮使用兩節池,預先建立多個 client socket 放入 連接配接池,需要發送請求時從連接配接池擷取一個socket,完成請求時放入連接配接池中。下面是一個簡單的實作。

        private  static String IP=GlobalNames.industryIP;

 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);

 private static  int CONNECTION_POOL_SIZE = 10;

 private static NIOConnectionPool self = null;

 private Hashtable<Integer, SocketChannel> socketPool = null; // 連接配接池

 private boolean[] socketStatusArray = null; // 連接配接的狀态(true-被占用,false-空閑)

 private static Selector selector  = null;

 private static InetSocketAddress SERVER_ADDRESS = null;

 public static synchronized void init() throws Exception {

  self = new NIOConnectionPool();

  self.socketPool = new Hashtable<Integer, SocketChannel>();

  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];

  buildConnectionPool();

 }

 public synchronized static void buildConnectionPool() throws Exception {

  if (self == null) {

   init();

  }

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   SocketChannel client = allocateSocketChannel();

   self.socketPool.put(new Integer(i), client);

   self.socketStatusArray[i] = false;

  }

 }

 public static SocketChannel getConnection() throws Exception {

  if (self == null)

   init();

  int i = 0;

  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {

   if (!self.socketStatusArray[i]) {

    self.socketStatusArray[i] = true;

    break;

   }

  }

  if (i < CONNECTION_POOL_SIZE) {

   return self.socketPool.get(new Integer(i));

  } else {

  //目前連接配接池無可用連接配接時隻是簡單的建立一個連接配接

   SocketChannel newClient = allocateSocketChannel();

   CONNECTION_POOL_SIZE++;

   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);

   return newClient;

  }

 }

 public static SocketChannel rebuildConnection(SocketChannel socket)

   throws Exception {

  if (self == null) {

   init();

  }

  SocketChannel newSocket = null;

  try {

   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

    if (self.socketPool.get(new Integer(i)) == socket) {

     newSocket = allocateSocketChannel();

     self.socketPool.put(new Integer(i), newSocket);

     self.socketStatusArray[i] = true;

    }

   }

  } catch (Exception e) {

   System.out.println("重建連接配接失敗!");

   throw new RuntimeException(e);

  }

  return newSocket;

 }

 public static void releaseConnection(SocketChannel socket) throws Exception {

  if (self == null) {

   init();

  }

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   if (self.socketPool.get(new Integer(i)) == socket) {

    self.socketStatusArray[i] = false;

    break;

   }

  }

 }

 public synchronized static void releaseAllConnection() throws Exception {

  if (self == null)

   init();

  // 關閉所有連接配接

  SocketChannel socket = null;

  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {

   socket = self.socketPool.get(new Integer(i));

   try {

    socket.close();

    self.socketStatusArray[i] = false;

   } catch (Exception e) {

    e.printStackTrace();

   }

  }

 }

 public static SocketChannel allocateSocketChannel(){

   SERVER_ADDRESS = new InetSocketAddress(  

                IP, PORT);  

  SocketChannel socketChannel = null;

     SocketChannel client = null;

     try{

     socketChannel = SocketChannel.open();  

        socketChannel.configureBlocking(false);  

        selector = Selector.open();  

        socketChannel.register(selector, SelectionKey.OP_CONNECT);  

        socketChannel.connect(SERVER_ADDRESS);

        Set<SelectionKey> selectionKeys;  

        Iterator<SelectionKey> iterator;  

        SelectionKey selectionKey;

        selector.select();  

        selectionKeys = selector.selectedKeys();  

        iterator = selectionKeys.iterator();  

        while (iterator.hasNext()) {  

            selectionKey = iterator.next();  

            if (selectionKey.isConnectable()) {  

                client = (SocketChannel) selectionKey.channel();  

                if (client.isConnectionPending()) {  

                    client.finishConnect();

                    client.register(selector, SelectionKey.OP_WRITE);  

                    break;

                }

            }

        }

 }catch(Exception e){

  e.printStackTrace();

 }

 return client;

  }

 public static Selector getSelector() {

  return selector;

 }

使用連接配接池進行通信:

     private static int BLOCK = 8*4096;  

     private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);  

     private static ByteBuffer protocalNum = ByteBuffer.allocate(4);

     private static ByteBuffer functionNum = ByteBuffer.allocate(4);

     private static ByteBuffer messageLen = ByteBuffer.allocate(4);

     private static ByteBuffer receivebuffer = null;

     private  SocketChannel client = null;

     private Selector selector = null;

     private boolean readable = true;

     private boolean writable = true;

     public NIOSocketBackUp() throws Exception{

      client = NIOConnectionPool.getConnection();

      selector = NIOConnectionPool.getSelector();

     }

     public String send(ServiceRequest request) throws Exception {   

         Set<SelectionKey> selectionKeys;  

         Iterator<SelectionKey> iterator;  

         SelectionKey selectionKey;  

         int count=0;  

         boolean flag = true;

         String receiveText="";  

          while (flag) {  

             selector.select();  

             //傳回此選擇器的已選擇鍵集。  

             selectionKeys = selector.selectedKeys();  

             iterator = selectionKeys.iterator();  

             while (iterator.hasNext()) {  

                 selectionKey = iterator.next();  

                 if (selectionKey.isWritable() && (writable)) {  

                         sendbuffer.clear();  

                         sendbuffer.put(request.getProtocalNum());

                         sendbuffer.put(request.getFunctionNum());

                         sendbuffer.put(request.getMessageLen());

                         sendbuffer.put(request.getXmlbytes());

                         sendbuffer.flip();  

                         client.write(sendbuffer);  

                         client.register(selector, SelectionKey.OP_READ);  

                         writable = false;

                 } else if (selectionKey.isReadable() && (readable) ) {  

                     protocalNum.clear();

                     functionNum.clear();

                     messageLen.clear();

                     count=client.read(protocalNum);  

                     count=client.read(functionNum);

                     count=client.read(messageLen);

                     messageLen.rewind();

                     int length = messageLen.asIntBuffer().get(0);

                     receivebuffer = ByteBuffer.allocate(length-12);

                     receivebuffer.clear();

                     //read方式竟然不阻塞

                     int total=0;

                     while(total!=(length-12)){

                       count=client.read(receivebuffer);

                       total+=count;

                     }

                     client.register(selector, SelectionKey.OP_WRITE);  

                     receiveText = new String(receivebuffer.array(),"GBK");

                     flag = false;

                     readable = false;

                     break;

                 }

             }  

         }

      NIOConnectionPool.releaseConnection(client);

         return receiveText.trim();

     }  

繼續閱讀