“你們的agent占了好多系統的端口,把我們的很多業務系統都給整死了,給我們造成了很大的損失,要求你們的相關上司下周過來道歉” -- 來自我們的一個客戶。
怎麼可能呢,我們都不相信,我們的agent隻占一個端口啊!
事實勝過雄辯,經過查證,确實是由于我們的agent占了好多系統的端口,我看了一下日志,基本把系統可用的端口占完了!
為什麼呢?MINA架構私自開的!
由于我們的agent端使用了NIO通信架構MINA,但并沒有使用好,造成了這一幾乎毀滅行的災難。
還是先看代碼吧。
/**
* 異步發送消息
* @param agent
* @param request
*/
public void sendMessageToAgent(Agent agent, HyRequest request) {
IoSession session = null;
IoConnector connector=null;
long startTime = System.currentTimeMillis();
try {
// 建立一個非阻塞的用戶端程式
connector = new NioSocketConnector();
// 設定連結逾時時間
connector.setConnectTimeoutMillis(connectTimeoutMillis);
ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
objsCodec);
// 資料轉換,編碼設定
connector.getFilterChain()
.addLast("codec", codecFilter);
// 消息
connector.setHandler(clientHandler);
SocketAddress socketAddress = new InetSocketAddress(
agent.getIpAddr(), agent.getAgentPort());
ConnectFuture future = connector.connect(socketAddress);
future.awaitUninterruptibly();
session = future.getSession();
String json = mapper.writeValueAsString(request);
session.write(json);
long endTime = System.currentTimeMillis();
logerr.debug("send-time:" + (endTime - startTime));
} catch (Exception e) {
logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
+ ", 連接配接異常..."+e.getMessage());
clientHandler.handlerConnectError(agent, request);
}
}
public class MinaClientHandler extends IoHandlerAdapter {
// 日志
private Logger log = Logger.getLogger(getClass());
private MinaResponseProcesser minaResponseProcesser;
ObjectMapper mapper=null;
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = message.toString();
log.info("receive message from " + session.getRemoteAddress().toString() + ",message:" + message);
if(null == mapper){
mapper = new ObjectMapper();
}
//請求消息轉換為HyResponse對象
HyResponse response = mapper.readValue(msg, HyResponse.class);
String remoteIp= ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress();
response.setRemoteIp(remoteIp);
HyRequest request = minaResponseProcesser.processResponse(response);
if(request == null){
//關閉目前session
closeSessionByServer(session,response);
}else{
session.write(mapper.writeValueAsString(request));
}
}
}
上面的邏輯就是,當要發送一個消息時,建立一個新的connector,并擷取一個session發送消息後直接傳回,在MinaClientHandler類的messageReceived裡面處理接受到的響應資料,并進行業務處理,最後如果不需要再次發送請求,則關閉目前session。
其實出現本文一開始的問題就是在這裡造成的。
在出現我們的agent占用大量端口後,我們這邊的工程人員就迅速定位到了這個問題,并很快修複了,但修複并不理想,但修複過後的代碼。
/**
* 異步發送消息
* @param agent
* @param request
*/
public void sendMessageToAgent(Agent agent, HyRequest request) {
IoSession session = null;
IoConnector connector=null;
long startTime = System.currentTimeMillis();
try {
// 建立一個非阻塞的用戶端程式
connector = new NioSocketConnector();
// 設定連結逾時時間
connector.setConnectTimeoutMillis(connectTimeoutMillis);
ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
objsCodec);
// 資料轉換,編碼設定
connector.getFilterChain()
.addLast("codec", codecFilter);
// 消息
connector.setHandler(clientHandler);
SocketAddress socketAddress = new InetSocketAddress(
agent.getIpAddr(), agent.getAgentPort());
ConnectFuture future = connector.connect(socketAddress);
future.awaitUninterruptibly();
session = future.getSession();
String json = mapper.writeValueAsString(request);
session.write(json);
// 等待斷開連接配接
session.getCloseFuture().awaitUninterruptibly();
long endTime = System.currentTimeMillis();
logerr.debug("send-time:" + (endTime - startTime));
//connector.dispose();
} catch (Exception e) {
logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
+ ", 連接配接異常..."+e.getMessage());
clientHandler.handlerConnectError(agent, request);
}finally{
if(null!=session){
session.close(true);
session=null;
}
if(null !=connector){
connector.dispose();
}
}
}
隻改了一個地方,就是在發送完消息後,加了一個等待斷開連接配接語句和finally語句塊-關閉session和connector。
雖然不會出現程式占用大量的系統端口這個問題,但會造成另外一個問題-當有一個消息隊列需要異步調用上面語句發送消息時,有原來的異步(發送完直接傳回,相當于快速并發發送)變成僞異步(發送完消息後并等待消息傳回處理後傳回,相當于順序處理隊列裡面的消息)。
上面的修改并不是我們想要的結果,但至少修複了占用大量端口的問題。
由于懷着想徹底修複這個問題的想法,我想還是深入了解一下MINA源碼吧。