天天看點

Android網絡程式設計(十四) 之 Socket與NIO1 簡介2 NIO的元件3 Socket與NIO4 總結

1 簡介

NIO(Non-Blocking I/O或叫New I/O)是一種同步非阻塞的I/O模型,主要用于服務端解決高并發或者大量連接配接的情況的IO處理。它是JDK1.4中引入的,位于java.nio包中,主要用于彌補原來同步阻塞I/O(Blocking I/O或叫BIO)的不足。在NIO出現之前大多服務端主要使用BIO通過建立線程的方式來解決并發請求,如上一篇博文《Android網絡程式設計(十三) 之 Socket和長連接配接》中的長連接配接Demo,在每個用戶端請求連接配接後都會建立一個新的Socket對象并内部建立線程來處理相關連接配接,這樣就很容易因線程瓶頸而造成很多限制。

NIO在處理讀寫是采用了記憶體映射檔案的方式,它基于通道(Channel)和緩沖區(Buffer)進行操作,資料從通道讀取到緩沖區或者從緩沖區寫入到通道,再通過選擇器(Selector)進行監聽多個通道的事件,是以差別于BIO的面向流方式,NIO可更加高效地進行檔案的讀寫操作。

2 NIO的元件

2.1 Buffer(緩沖區)

BIO的操作是面向資料流的讀寫,而NIO所有的資料都是用Buffer緩沖區處理的,緩沖區其實就是一塊連續的記憶體空間,這塊記憶體空間就像一個資料容器般,可以重複的讀取資料和寫入資料。

2.2 Channel(通道)

Channel通道跟BIO中的Stream類似,都是用于跟連接配接的對象進行IO操作。它們差別于,Stream是阻塞的單向操作的,即要麼讀要麼寫,比如InputStream和OutputStream;而Channel是非阻塞且是線程安全的雙向操作的,通過一個Channel既可以進行讀也可進行寫操作,其所有資料都是映射到記憶體中通過Buffer來處理,Server端邏輯中,一個Client端會對應一個Channel。

2.3 Selector(選擇器)

在BIO中當一個Server端連接配接着多個Client端時,Server端會為其建立一個線程來提升并發吞吐量,但是一旦并發量上升就會出現明顯的弊端。在這情況Selector的優勢就出現了。Selector叫做選擇器,或者叫做多路複用器,Selector運作在單個線程中但可同時管理一個或多個Channel。它通過不斷地輪詢進行Channel的狀态的檢查處理其連接配接、讀、寫等操作。意味着可以使用更少的線程來處理多個Client端的請求,避免了使用線程的開銷。Server邏輯中,僅需要一個Selector,但它可以管理多個Channel。

3 Socket與NIO

我們還是用一個簡單的Demo來實作一個Socket,不過這次使用了NIO的方式。Demo中服務端在App的Service中進行,而用戶端在App的Activity中進行,為了展示出服務端可以同時接收多個用戶端,Activity的界面特意做了兩套用戶端,如下圖所示。

Android網絡程式設計(十四) 之 Socket與NIO1 簡介2 NIO的元件3 Socket與NIO4 總結

3.1 服務端代碼

TCPServerService.java

public class TCPServerService extends Service {
    private static final String TAG = "TCPServerService----------";

    public final static int SERVER_PORT = 9527;                     // 跟用戶端絕定的端口

    private TCPServer mTCPServer;
    private ThreadPoolExecutor mConnectThreadPool;                  // 總的連接配接線程池

    @Override
    public void onCreate() {
        super.onCreate();
        init();
        initTcpServer();
    }

    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        unInitTcpServer();
    }

    private void init() {
        mConnectThreadPool = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "server_thread_pool");
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        Log.e(TAG, "已啟動連接配接,請免重複操作");
                    }
                }
        );
    }

    /**
     * 初始化TCP服務
     */
    private void initTcpServer() {
        mConnectThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                mTCPServer = new TCPServer();
                mTCPServer.init();
            }
        });
    }

    /**
     * 反初始化TCP服務
     */
    private void unInitTcpServer() {
        mTCPServer.close();
    }
}
           

服務端的實作在TCPServerService中,TCPServerService服務啟動後,便建立一個線程來建立一個TCPServer對象并執行初始化。

TCPServer.java

public class TCPServer {
    private final static String TAG = "TCPServer----------";

    private String mSendMsg;
    private Selector mSelector;

    public void init() {
        ServerSocketChannel serverSocketChannel = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();
            // 設定非阻塞
            serverSocketChannel.configureBlocking(false);
            // 擷取與此Channel關聯的ServerSocket并綁定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(TCPServerService.SERVER_PORT));
            // 注冊到Selector,等待連接配接
            mSelector = Selector.open();
            serverSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
            while (mSelector != null && mSelector.isOpen()) {
                // 選擇一組對應Channel已準備好進行I/O的Key
                int select = mSelector.select();
                if (select <=0) {
                    continue;
                }
                // 獲得Selector已選擇的Keys
                Set<SelectionKey> selectionKeys = mSelector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();

                    // 移除目前的key
                    iterator.remove();

                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                        handleAccept(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        handleRead(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        handleWrite(selectionKey);
                    }

                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (mSelector != null) {
                    mSelector.close();
                    mSelector = null;
                }
                if (serverSocketChannel != null) {
                    serverSocketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleAccept(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        // 注冊讀就緒事件
        client.register(mSelector, SelectionKey.OP_READ);
        Log.d(TAG, "服務端 同意 用戶端(" + client.getRemoteAddress() + ") 的連接配接請求");
    }

    private void handleRead(SelectionKey selectionKey) throws IOException {
        SocketChannel client = (SocketChannel) selectionKey.channel();

        //讀取伺服器發送來的資料到緩沖區中
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int bytesRead = client.read(byteBuffer);
        if (bytesRead > 0) {
            String inMsg = new String(byteBuffer.array(), 0, bytesRead);
            // 處理資料
            responseMsg(selectionKey, inMsg);
        }
        else {
            Log.d(TAG, "服務端 斷開跟 用戶端(" + client.getRemoteAddress() + ") 的連接配接");
            client.close();
        }
    }

    private void handleWrite(SelectionKey selectionKey) throws IOException {
        if (TextUtils.isEmpty(mSendMsg)) {
            return;
        }
        SocketChannel client = (SocketChannel) selectionKey.channel();

        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
        sendBuffer.put(mSendMsg.getBytes());
        sendBuffer.flip();

        client.write(sendBuffer);
        mSendMsg = null;

        client.register(mSelector, SelectionKey.OP_READ);
    }

    /**
     * 處理資料
     *
     * @param selectionKey
     * @param inMsg
     * @throws IOException
     */
    private void responseMsg(SelectionKey selectionKey, String inMsg) throws IOException {
        SocketChannel client = (SocketChannel) selectionKey.channel();
        Log.d(TAG, "服務端 收到 用戶端(" + client.getRemoteAddress() + ") 資料:" + inMsg);

        // 估計1億的AI代碼
        String outMsg = inMsg;
        outMsg = outMsg.replace("嗎", "");
        outMsg = outMsg.replace("?", "!");
        outMsg = outMsg.replace("?", "!");
        sendMsg(selectionKey, outMsg);
    }

    /**
     * 發送資料
     *
     * @param selectionKey
     * @param msg
     * @throws IOException
     */
    public void sendMsg(SelectionKey selectionKey, String msg) throws IOException {
        mSendMsg = msg;
        SocketChannel client = (SocketChannel) selectionKey.channel();
        client.register(mSelector, SelectionKey.OP_WRITE);
        Log.d(TAG, "服務端 給 用戶端(" + client.getRemoteAddress() + ") 發送資料:" + msg);
    }

    /**
     * 斷開連接配接
     */
    public void close() {
        try {
            Log.d(TAG, "服務端中斷所有連接配接");
            mSelector.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
           

TCPServer類核心代碼就是init方法,可見方法記憶體在ServerSocketChannel和Selector,它們便是我們上面介紹的通道和選擇器。除此外還有一個SelectionKey,它是用于維護Channel和Selector的對應關系。而且可見TCPServer類内不再需要對每個用戶端的連接配接再開啟新線程。

SelectionKey裡頭有四個常量:SelectionKey.OP_CONNECT、SelectionKey.OP_ACCEPT、SelectionKey.OP_READ、SelectionKey.OP_WRITE,它們表示Channel注冊到Selectort感興趣的事件。對應selectionKey.isConnectable()、selectionKey.isAcceptable()、selectionKey.isReadable()、selectionKey.isWritable()方法會傳回true,是以可以了解成,主要注冊了相應的事件,上述循環中便會執行相應傳回true的動作。

3.2 用戶端代碼

activity_main.xml

<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">

    <Button
        android:id="@+id/btn_connection1"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="80dp"
        android:layout_marginLeft="30dp"
        android:text="連接配接1" />

    <Button
        android:id="@+id/btn_send1"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="80dp"
        android:layout_centerHorizontal="true"
        android:text="發送1" />

    <Button
        android:id="@+id/btn_disconnect1"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="80dp"
        android:layout_alignParentRight="true"
        android:layout_marginRight="30dp"
        android:text="斷開1" />

    <Button
        android:id="@+id/btn_connection2"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="180dp"
        android:layout_marginLeft="30dp"
        android:text="連接配接2" />

    <Button
        android:id="@+id/btn_send2"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="180dp"
        android:layout_centerHorizontal="true"
        android:text="發送2" />

    <Button
        android:id="@+id/btn_disconnect2"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="180dp"
        android:layout_alignParentRight="true"
        android:layout_marginRight="30dp"
        android:text="斷開2" />

</RelativeLayout>
           

MainActivity.java

public class MainActivity extends AppCompatActivity {
    private TCPClient mTcpClient1;
    private TCPClient mTcpClient2;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Intent service = new Intent(this, TCPServerService.class);
        startService(service);

        mTcpClient1 = new TCPClient("用戶端A");
        mTcpClient2 = new TCPClient("用戶端B");

        Button btnConnection1 = findViewById(R.id.btn_connection1);
        btnConnection1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient1.requestConnectTcp();
            }
        });
        Button btnSend1 = findViewById(R.id.btn_send1);
        btnSend1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient1.sendMsg("2_你好嗎?");
            }
        });
        Button btnDisconnect1 = findViewById(R.id.btn_disconnect1);
        btnDisconnect1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient1.disconnectTcp();
            }
        });


        Button btnConnection2 = findViewById(R.id.btn_connection2);
        btnConnection2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient2.requestConnectTcp();
            }
        });
        Button btnSend2 = findViewById(R.id.btn_send2);
        btnSend2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient2.sendMsg("2_吃飯了嗎?");
            }
        });
        Button btnDisconnect2 = findViewById(R.id.btn_disconnect2);
        btnDisconnect2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                mTcpClient2.disconnectTcp();
            }
        });
    }
}
           

用戶端的實作在MainActivity中,MainActivity主要是建立了兩個TCPClient對象,然後對應界面中的按鈕作相應的邏輯。

TCPClient.java

public class TCPClient {
    private static final String TAG = "TCPClient**********";

    private String mSendMsg;
    private String mClientName;                                                 // 用戶端命名
    private Selector mSelector;
    private SocketChannel mSocketChannel;

    private ThreadPoolExecutor mConnectThreadPool;                              // 消息連接配接和接收的線程池

    public TCPClient(String clientName) {
        init(clientName);
    }

    /**
     * 基本初始化
     *
     * @param clientName
     */
    private void init(String clientName) {
        mClientName = clientName;
        mConnectThreadPool = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "client_connection_thread_pool");
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        Log.e(TAG, mClientName + " 已啟動連接配接,請免重複操作");
                    }
                }
        );
    }

    /**
     * 請求連接配接服務端
     */
    public void requestConnectTcp() {
        mConnectThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                initSocketAndReceiveMsgLoop();
            }
        });
    }

    /**
     *
     */
    private void initSocketAndReceiveMsgLoop() {
        try {
            mSocketChannel = SocketChannel.open();
            // 設定為非阻塞方式
            mSocketChannel.configureBlocking(false);
            // 連接配接服務端位址和端口
            mSocketChannel.connect(new InetSocketAddress("127.0.0.1", TCPServerService.SERVER_PORT));

            // 注冊到Selector,請求連接配接
            mSelector = Selector.open();
            mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT);
            while (mSelector != null && mSelector.isOpen() && mSocketChannel != null && mSocketChannel.isOpen()) {
                // 選擇一組對應Channel已準備好進行I/O的Key
                int select = mSelector.select();                            // 當沒有消息時,這裡也是會阻塞的
                if (select <= 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = mSelector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();

                    // 移除目前的key
                    iterator.remove();

                    if (selectionKey.isValid() && selectionKey.isConnectable()) {
                        handleConnect();
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        handleRead();
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        handleWrite();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    private void handleConnect() throws IOException {
        // 判斷此通道上是否正在進行連接配接操作。
        if (mSocketChannel.isConnectionPending()) {
            mSocketChannel.finishConnect();
            mSocketChannel.register(mSelector, SelectionKey.OP_READ);
            Log.d(TAG, mClientName + " 請求跟服務端建立連接配接");
        }
    }

    private void handleRead() throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int bytesRead = mSocketChannel.read(byteBuffer);
        if (bytesRead > 0) {
            String inMsg = new String(byteBuffer.array(), 0, bytesRead);
            Log.d(TAG, mClientName + " 收到服務端資料: " + inMsg);
        } else {
            Log.d(TAG, mClientName + "  斷開跟 服務端的連接配接");
            disconnectTcp();
        }
    }

    private void handleWrite() throws IOException {
        if (TextUtils.isEmpty(mSendMsg)) {
            return;
        }
        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
        sendBuffer.put(mSendMsg.getBytes());
        sendBuffer.flip();

        mSocketChannel.write(sendBuffer);

        Log.d(TAG, "--------------------------------------");
        Log.d(TAG, mClientName + " 發送資料: " + mSendMsg);

        mSendMsg = null;
        mSocketChannel.register(mSelector, SelectionKey.OP_READ);
    }

    /**
     * 發送資料
     *
     * @param msg
     * @throws IOException
     */
    public void sendMsg(String msg) {
        if (mSelector == null || !mSelector.isOpen() || mSocketChannel == null || !mSocketChannel.isOpen()) {
            return;
        }
        try {
            mSendMsg = msg;
            mSocketChannel.register(mSelector, SelectionKey.OP_WRITE);
            mSelector.wakeup();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開連接配接
     */
    public void disconnectTcp() {
        Log.d(TAG, "--------------------------------------");
        Log.d(TAG, mClientName + " 主動斷開跟服務端連接配接");

        close();
    }

    /**
     * 斷開連接配接
     */
    private void close() {
        try {
            if (mSelector != null && mSelector.isOpen()) {
                mSelector.close();
            }
            if (mSocketChannel != null && mSocketChannel.isOpen()) {
                mSocketChannel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
           

TCPClient類對外就是對應三種按鈕事件:連接配接服務端requestConnectTcp、發送資料sendMsg、斷開連接配接disconnectTcp,基本上跟服務端TCPServer類的邏輯很像。

3.3 輸出日志

運作程式後,相應執行連接配接和斷開按鈕會能輸出以下日志:

2020-12-31 16:59:44.558 31789-31851/com.zyx.myapplication D/TCPClient**********: 用戶端A 請求跟服務端建立連接配接
2020-12-31 16:59:44.558 31789-31829/com.zyx.myapplication D/TCPServer----------: 服務端 同意 用戶端(/127.0.0.1:42250) 的連接配接請求

2020-12-31 16:59:47.315 31789-31789/com.zyx.myapplication D/ContentCapture: checkClickAndCapture, voiceRecorder=disable, collection=disable
2020-12-31 16:59:47.318 31789-31851/com.zyx.myapplication D/TCPClient**********: --------------------------------------
2020-12-31 16:59:47.318 31789-31851/com.zyx.myapplication D/TCPClient**********: 用戶端A 發送資料: 2_你好嗎?
2020-12-31 16:59:47.318 31789-31829/com.zyx.myapplication D/TCPServer----------: 服務端 收到 用戶端(/127.0.0.1:42250) 資料:2_你好嗎?
2020-12-31 16:59:47.318 31789-31829/com.zyx.myapplication D/TCPServer----------: 服務端 給 用戶端(/127.0.0.1:42250) 發送資料:2_你好!
2020-12-31 16:59:47.321 31789-31851/com.zyx.myapplication D/TCPClient**********: 用戶端A 收到服務端資料: 2_你好!

2020-12-31 16:59:49.689 31789-31789/com.zyx.myapplication D/ContentCapture: checkClickAndCapture, voiceRecorder=disable, collection=disable
2020-12-31 16:59:49.689 31789-31789/com.zyx.myapplication D/TCPClient**********: --------------------------------------
2020-12-31 16:59:49.689 31789-31789/com.zyx.myapplication D/TCPClient**********: 用戶端A 主動斷開跟服務端連接配接
2020-12-31 16:59:49.690 31789-31851/com.zyx.myapplication D/FlymeTrafficTracking: untag(69) com.zyx.myapplication client_connection_thread_pool uid 10472 5134ms
2020-12-31 16:59:49.692 31789-31829/com.zyx.myapplication D/TCPServer----------: 服務端 斷開跟 用戶端(/127.0.0.1:42250) 的連接配接
           

4 總結

好了,到此Socket的使用包括長連接配接、NIO都已認證上篇和本篇博文介紹完畢,有興趣的朋友可以将兩篇文章中的兩個Demo結合來搭建一個屬于自己長連接配接架構。

繼續閱讀