Socket程式設計
服務端TCPServer
服務端,監聽連接配接,送出線程池 并發處理每個連接配接
package communication.tcp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author wangying
* @version $Id: TcpServer.java, v 0.1 2019-4-19 下午12:09:31 wangying Exp $
*/
public class TCPServer extends Thread {
private static Logger logger = LoggerFactory.getLogger(TCPServer.class);
private ServerSocket server = null;
private volatile boolean closing = false;
// 線程池用于 處理每個用戶端連接配接
private static final ExecutorService pool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
pool.submit(new TCPServer());
}
@Override
public void run() {
try {
if (!isBound()) {
tryToBind();
}
} catch (IOException e) {
return;
}
while (!closing) {
try {
Socket client = server.accept();
if (logger.isDebugEnabled()) {
logger.debug("server recieve message from " + client.getInetAddress());
}
// 送出監聽器線程 來處理用戶端的連接配接
TcpServerListener listener = new TcpServerListener(client);
// listener.setDaemon(true);
pool.submit(listener);
} catch (IOException e) {
if (!closing)
logger.warn("control socket error: ", e);
else {
logger.warn("shutting down listen thread due to shutdown() call");
break;
}
}
}// 結束循環
}
public boolean isBound() {
return server != null && server.isBound();
}
public void tryToBind() throws IOException {
// ServerSocket 綁定服務端口
server = new ServerSocket(8200);
server.setReuseAddress(true);
if (server.isBound()) {
logger.info("server socket bound to " + server.getLocalPort());
} else {
logger.info("server socket isn't bound");
}
}
/**
* 關閉tcp ServerSocket
*/
public void shutdown() {
closing = true;
try {
if (server != null) { // 防止NPE
server.close();
}
server = null;
} catch (IOException e) {
logger.error(e + "shutdown tcp server failed");
}
}
}
連接配接處理TcpServerListener
對于每個連接配接的處理,此處為最簡單的echo 回聲響應用戶端
package communication.tcp;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
/**
*
* @author wangying
* @version $Id: TcpServerListener.java, v 0.1 2019-4-19 下午5:23:18 wangying Exp $
*/
public class TcpServerListener extends Thread {
private static Logger logger = LoggerFactory.getLogger(TcpServerListener.class);
private Socket client;
private static String CHARSET = "UTF-8";
TcpServerListener(Socket client) {
this.client = client;
try {
client.setSoTimeout(1000);
} catch (SocketException e) {
logger.warn("Error while setting soTimeout to " + 1000);
e.printStackTrace();
}
this.setName("TcpServerListener-" + client.getRemoteSocketAddress());
}
@Override
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
try {
// 雙工 讀寫
reader = new BufferedReader(new InputStreamReader(client.getInputStream(), CHARSET));
String reqMsg = reader.readLine();
logger.info("server recive client message:" + reqMsg);
writer = new PrintWriter(new OutputStreamWriter(client.getOutputStream(), CHARSET));
handleRecvMessage(reqMsg, writer);
} catch (IOException e) {
e.printStackTrace();
logger.warn("a control connection broke", e);
} finally {
IOUtils.closeQuietly(writer);
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(client);
}
}
private void handleRecvMessage(String msg, Writer writer) {
String respMsg = "";
// 響應
respMsg = "i have received " + msg;
logger.info("server response message:" + respMsg);
// return execute result to collector
try {
writer.write(respMsg);
writer.flush();
} catch (IOException e) {
logger.error(e + "write response to collector error!");
}
}
}
用戶端TCPClient
package communication.tcp;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @author wangying
* @version $Id: TcpClient.java, v 0.1 2014年12月4日 下午7:57:56 songfei01 Exp $
*/
public class TcpClient extends Thread{
private static final Logger logger = LoggerFactory.getLogger(TcpClient.class);
private static final ExecutorService pool = Executors.newFixedThreadPool(10);
/**
* 連接配接逾時,預設5000ms
*/
private static final int connectTimeout = 5000;
private static final int DEFAULT_READE_TIMEOUT = 10000;
public static void main(String[] args) {
for(int i=0;i<10;i++) {
pool.submit(new TcpClient());
}
}
@Override
public void run() {
String host = "localhost";
String port = "8200";
while (true) {
String request = "我是" + Thread.currentThread().getName() +"用戶端";
String resp = new TcpClient().post(host, port, request, "UTF-8");
logger.info("收到響應:" + resp);
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public String post(String host, String port, String request, String charset) {
String serviceResponse = null;
try {
// 發送請求,拿到響應
serviceResponse = send(request, charset, host, Integer.valueOf(port)).get();
} catch (Exception e) {
return new String( "發送失敗:" + host);
}
return serviceResponse;
}
// 發送資料
public Future<String> send(String requestMsg, String charset, String ip, int port) {
Future<String> future = pool.submit(new SendCall(requestMsg, charset, ip, port));
return future;
}
public String sendMessage(String msg, String charset, String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
try {
socket = new Socket(host, port);
// 設定讀取逾時時間
socket.setSoTimeout(DEFAULT_READE_TIMEOUT);
// 寫入請求
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset));
logger.info("send data to {}:{}", new Object[] { host, port });
logger.info("發送 REQUEST:{}", msg);
writer.write(msg + "\n");
writer.flush();
// 讀取響應
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), charset));
String result = reader.readLine();
return result;
} catch (IOException e) {
logger.error(e + "發送消息失敗");
} finally {
// 靜默關閉
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(writer);
IOUtils.closeQuietly(socket);
}
return null;
}
/**
* 發送任務
*/
private class SendCall implements Callable<String> {
private final String requestMsg;
private final String charset;
private final String ip;
private final int port;
/**
*
*/
public SendCall(String requestMsg, String charset, String ip, int port) {
super();
this.requestMsg = requestMsg;
this.charset = charset;
this.ip = ip;
this.port = port;
}
/**
* 用戶端 發送任務
* @see Callable#call()
*/
@Override
public String call() throws Exception {
String resp;
try {
resp = sendMessage(requestMsg, charset, ip, port);
} catch (Exception e) {
String failMsg = "發送失敗,ip:" + this.ip + "port:" + this.port;
return new String(failMsg);
}
return resp;
}
}
}