天天看點

TCP通信 服務端用戶端實作Socket程式設計

Socket程式設計

服務端TCPServer

服務端,監聽連接配接,送出線程池 并發處理每個連接配接

package communication.tcp;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author wangying
 * @version $Id: TcpServer.java, v 0.1 2019-4-19 下午12:09:31 wangying Exp $
 */
public class TCPServer extends Thread {

    private static Logger logger = LoggerFactory.getLogger(TCPServer.class);

    private ServerSocket server = null;

    private volatile boolean closing = false;
    // 線程池用于 處理每個用戶端連接配接
    private static final ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        pool.submit(new TCPServer());
    }

    @Override
    public void run() {
        try {
            if (!isBound()) {
                tryToBind();
            }
        } catch (IOException e) {
            return;
        }

        while (!closing) {
            try {
                Socket client = server.accept();
                if (logger.isDebugEnabled()) {
                    logger.debug("server recieve message from " + client.getInetAddress());
                }
                // 送出監聽器線程 來處理用戶端的連接配接
                TcpServerListener listener = new TcpServerListener(client);
                // listener.setDaemon(true);
                pool.submit(listener);
            } catch (IOException e) {
                if (!closing)
                    logger.warn("control socket error: ", e);
                else {
                    logger.warn("shutting down listen thread due to shutdown() call");
                    break;
                }
            }
        }// 結束循環

    }

    public boolean isBound() {
        return server != null && server.isBound();
    }

    public void tryToBind() throws IOException {
        // ServerSocket 綁定服務端口
        server = new ServerSocket(8200);

        server.setReuseAddress(true);

        if (server.isBound()) {
            logger.info("server socket bound to " + server.getLocalPort());
        } else {
            logger.info("server socket isn't bound");
        }
    }

    /**
     * 關閉tcp ServerSocket
     */
    public void shutdown() {
        closing = true;
        try {
            if (server != null) { // 防止NPE
                server.close();
            }
            server = null;
        } catch (IOException e) {
            logger.error(e + "shutdown tcp server failed");
        }
    }

}
           

連接配接處理TcpServerListener

對于每個連接配接的處理,此處為最簡單的echo 回聲響應用戶端

package communication.tcp;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.net.SocketException;

/**
 * 
 * @author wangying
 * @version $Id: TcpServerListener.java, v 0.1 2019-4-19 下午5:23:18 wangying Exp $
 */
public class TcpServerListener extends Thread {

    private static Logger logger = LoggerFactory.getLogger(TcpServerListener.class);

    private Socket client;

    private static String CHARSET = "UTF-8";


    TcpServerListener(Socket client) {
        this.client = client;
        try {
            client.setSoTimeout(1000);
        } catch (SocketException e) {
            logger.warn("Error while setting soTimeout to " + 1000);
            e.printStackTrace();
        }
        this.setName("TcpServerListener-" + client.getRemoteSocketAddress());
    }
    @Override
    public void run() {
        BufferedReader reader = null;
        PrintWriter writer = null;
        try {
            // 雙工 讀寫
            reader = new BufferedReader(new InputStreamReader(client.getInputStream(), CHARSET));
            String reqMsg = reader.readLine();
            logger.info("server recive client message:" + reqMsg);
            writer = new PrintWriter(new OutputStreamWriter(client.getOutputStream(), CHARSET));
            handleRecvMessage(reqMsg, writer);
        } catch (IOException e) {
            e.printStackTrace();
            logger.warn("a control connection broke", e);
        } finally {
            IOUtils.closeQuietly(writer);
            IOUtils.closeQuietly(reader);
            IOUtils.closeQuietly(client);
        }
    }

    private void handleRecvMessage(String msg, Writer writer) {
        String respMsg = "";
        // 響應
        respMsg = "i have received " + msg;
        logger.info("server response message:" + respMsg);
        // return execute result to collector
        try {
            writer.write(respMsg);
            writer.flush();
        } catch (IOException e) {
            logger.error(e + "write response to collector error!");
        }
    }
}
           

用戶端TCPClient

package communication.tcp;


import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @author wangying
 * @version $Id: TcpClient.java, v 0.1 2014年12月4日 下午7:57:56 songfei01 Exp $
 */
public class TcpClient extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(TcpClient.class);
    
    private static final ExecutorService pool = Executors.newFixedThreadPool(10);
    /**
     * 連接配接逾時,預設5000ms
     */
    private static final int connectTimeout = 5000;

    private static final int DEFAULT_READE_TIMEOUT = 10000;

    public static void main(String[] args) {
        for(int i=0;i<10;i++) {
            pool.submit(new TcpClient());
        }

    }
    @Override
    public void run() {
        String host = "localhost";
        String port = "8200";
        while (true) {
            String request = "我是" + Thread.currentThread().getName() +"用戶端";
            String resp = new TcpClient().post(host, port, request, "UTF-8");
            logger.info("收到響應:" + resp);
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public String post(String host, String port, String request, String charset) {

        String serviceResponse = null;
        try {
            // 發送請求,拿到響應
            serviceResponse = send(request, charset, host, Integer.valueOf(port)).get();
        } catch (Exception e) {
            return new String( "發送失敗:" + host);
        }
        return serviceResponse;

    }

    // 發送資料
    public Future<String> send(String requestMsg, String charset, String ip, int port) {
        Future<String> future = pool.submit(new SendCall(requestMsg, charset, ip, port));
        return future;
    }

    public String sendMessage(String msg, String charset, String host, int port) {
        BufferedReader reader = null;
        BufferedWriter writer = null;
        Socket socket = null;
        try {
            socket = new Socket(host, port);
            // 設定讀取逾時時間
            socket.setSoTimeout(DEFAULT_READE_TIMEOUT);
            // 寫入請求
            writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset));
            logger.info("send data to {}:{}", new Object[] { host, port });
            logger.info("發送 REQUEST:{}", msg);
            writer.write(msg + "\n");
            writer.flush();
            // 讀取響應
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), charset));
            String result = reader.readLine();
            return result;

        } catch (IOException e) {
            logger.error(e + "發送消息失敗");
        } finally {
            // 靜默關閉
            IOUtils.closeQuietly(reader);
            IOUtils.closeQuietly(writer);
            IOUtils.closeQuietly(socket);
        }
        return null;
    }

    /**
     * 發送任務
     */
    private class SendCall implements Callable<String> {
        private final String requestMsg;
        private final String charset;
        private final String ip;
        private final int port;

        /**
         *
         */
        public SendCall(String requestMsg, String charset, String ip, int port) {
            super();
            this.requestMsg = requestMsg;
            this.charset = charset;
            this.ip = ip;
            this.port = port;
        }

        /**
         * 用戶端 發送任務
         * @see Callable#call()
         */
        @Override
        public String call() throws Exception {
            String resp;
            try {
                resp = sendMessage(requestMsg, charset, ip, port);

            } catch (Exception e) {
                String failMsg = "發送失敗,ip:" + this.ip + "port:" + this.port;
                return new String(failMsg);
            }
            return resp;
        }
    }


}