在最近的項目中,需要寫一個socket 與 底層伺服器通信的子產品。在設計中,請求對象被封裝 xxxRequest,消息傳回被封裝為 xxxResponse. 由于socket的程式設計開發經驗少,一開始我使用了短連接配接的方式,每個請求建立一個socket通信,由于每個socket隻進行一次讀寫,這大大浪費了系統資源。
于是考慮使用長連接配接,系統公用一個client socket 并對send 操作進行加鎖,結果在處理并發的時候,各種慢,各種等待。沒有辦法,考慮使用兩節池,預先建立多個 client socket 放入 連接配接池,需要發送請求時從連接配接池擷取一個socket,完成請求時放入連接配接池中。下面是一個簡單的實作。
private static String IP=GlobalNames.industryIP;
private static int PORT =Integer.parseInt(GlobalNames.industryPort);
private static int CONNECTION_POOL_SIZE = 10;
private static NIOConnectionPool self = null;
private Hashtable<Integer, SocketChannel> socketPool = null; // 連接配接池
private boolean[] socketStatusArray = null; // 連接配接的狀态(true-被占用,false-空閑)
private static Selector selector = null;
private static InetSocketAddress SERVER_ADDRESS = null;
public static synchronized void init() throws Exception {
self = new NIOConnectionPool();
self.socketPool = new Hashtable<Integer, SocketChannel>();
self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
buildConnectionPool();
}
public synchronized static void buildConnectionPool() throws Exception {
if (self == null) {
init();
}
for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
SocketChannel client = allocateSocketChannel();
self.socketPool.put(new Integer(i), client);
self.socketStatusArray[i] = false;
}
}
public static SocketChannel getConnection() throws Exception {
if (self == null)
init();
int i = 0;
for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
if (!self.socketStatusArray[i]) {
self.socketStatusArray[i] = true;
break;
}
}
if (i < CONNECTION_POOL_SIZE) {
return self.socketPool.get(new Integer(i));
} else {
//目前連接配接池無可用連接配接時隻是簡單的建立一個連接配接
SocketChannel newClient = allocateSocketChannel();
CONNECTION_POOL_SIZE++;
self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
return newClient;
}
}
public static SocketChannel rebuildConnection(SocketChannel socket)
throws Exception {
if (self == null) {
init();
}
SocketChannel newSocket = null;
try {
for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
if (self.socketPool.get(new Integer(i)) == socket) {
newSocket = allocateSocketChannel();
self.socketPool.put(new Integer(i), newSocket);
self.socketStatusArray[i] = true;
}
}
} catch (Exception e) {
System.out.println("重建連接配接失敗!");
throw new RuntimeException(e);
}
return newSocket;
}
public static void releaseConnection(SocketChannel socket) throws Exception {
if (self == null) {
init();
}
for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
if (self.socketPool.get(new Integer(i)) == socket) {
self.socketStatusArray[i] = false;
break;
}
}
}
public synchronized static void releaseAllConnection() throws Exception {
if (self == null)
init();
// 關閉所有連接配接
SocketChannel socket = null;
for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
socket = self.socketPool.get(new Integer(i));
try {
socket.close();
self.socketStatusArray[i] = false;
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static SocketChannel allocateSocketChannel(){
SERVER_ADDRESS = new InetSocketAddress(
IP, PORT);
SocketChannel socketChannel = null;
SocketChannel client = null;
try{
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(SERVER_ADDRESS);
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
selector.select();
selectionKeys = selector.selectedKeys();
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
client = (SocketChannel) selectionKey.channel();
if (client.isConnectionPending()) {
client.finishConnect();
client.register(selector, SelectionKey.OP_WRITE);
break;
}
}
}
}catch(Exception e){
e.printStackTrace();
}
return client;
}
public static Selector getSelector() {
return selector;
}
使用連接配接池進行通信:
private static int BLOCK = 8*4096;
private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
private static ByteBuffer protocalNum = ByteBuffer.allocate(4);
private static ByteBuffer functionNum = ByteBuffer.allocate(4);
private static ByteBuffer messageLen = ByteBuffer.allocate(4);
private static ByteBuffer receivebuffer = null;
private SocketChannel client = null;
private Selector selector = null;
private boolean readable = true;
private boolean writable = true;
public NIOSocketBackUp() throws Exception{
client = NIOConnectionPool.getConnection();
selector = NIOConnectionPool.getSelector();
}
public String send(ServiceRequest request) throws Exception {
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
int count=0;
boolean flag = true;
String receiveText="";
while (flag) {
selector.select();
//傳回此選擇器的已選擇鍵集。
selectionKeys = selector.selectedKeys();
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isWritable() && (writable)) {
sendbuffer.clear();
sendbuffer.put(request.getProtocalNum());
sendbuffer.put(request.getFunctionNum());
sendbuffer.put(request.getMessageLen());
sendbuffer.put(request.getXmlbytes());
sendbuffer.flip();
client.write(sendbuffer);
client.register(selector, SelectionKey.OP_READ);
writable = false;
} else if (selectionKey.isReadable() && (readable) ) {
protocalNum.clear();
functionNum.clear();
messageLen.clear();
count=client.read(protocalNum);
count=client.read(functionNum);
count=client.read(messageLen);
messageLen.rewind();
int length = messageLen.asIntBuffer().get(0);
receivebuffer = ByteBuffer.allocate(length-12);
receivebuffer.clear();
//read方式竟然不阻塞
int total=0;
while(total!=(length-12)){
count=client.read(receivebuffer);
total+=count;
}
client.register(selector, SelectionKey.OP_WRITE);
receiveText = new String(receivebuffer.array(),"GBK");
flag = false;
readable = false;
break;
}
}
}
NIOConnectionPool.releaseConnection(client);
return receiveText.trim();
}