天天看點

java nio之selector

  一、selector簡介:選擇器提供選擇執行已經就緒的任務的能力.從底層來看,Selector提供了詢問通道是否已經準備好執行每個I/O操作的能力。Selector 允許一個單一的線程來操作多個 Channel。僅用單個線程來處理多個Channels的好處是,隻需要更少的線程來處理通道。事實上,可以隻用一個線程處理所有的通道,這樣會大量的減少線程之間上下文切換的開銷。

  二、選擇器的建立以及使用

  1)建立 

Selector selector = Selector.open();      

  2)注冊選擇器(Channel這裡不介紹)

socketChannel.configureBlocking(false);      
socketChannel.register(selector, SelectionKey.OP_READ)      

  注意:一個通道注冊到選擇器中,必須是非阻塞的。

  3)注冊模式有4種

SelectionKey.OP_CONNECT 
SelectionKey.OP_ACCEPT 
SelectionKey.OP_READ 
SelectionKey.OP_WRITE      

  4)SelectionKey的使用

  在選擇其中會存在多個選擇鍵SelectionKey,每一個選擇鍵的類型可能不一樣,是以我們這裡需要判定是哪一種類型

selector.selectedKeys() //擷取所有選擇鍵
selectionKey.isConnectable() //是否是連接配接選擇鍵
selectionKey.isReadable() //讀取
selectionKey.isWritable() //寫入
selectionKey.isAcceptable() //接收      

  擷取對應的選擇鍵過後可以強轉成對應的通信管道。(示例)

SocketChannel channel = (SocketChannel) selectionKey.channel();      

  三、聊天室的基本寫法(基本使用都在裡面)

  1)用戶端

package com.troy.nio.application;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Client {

    //選擇器
    private static Selector selector;
    //通信管道
    private static SocketChannel socketChannel;
    public static void main(String[] args) {
        try {
            clientInit();
            listen();
            //發送資料
            while (true) {
                Thread.sleep(1000);
                socketChannel.write(ByteBuffer.wrap(("hello server!").getBytes()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //初始化選擇器和發送資料
    private static void clientInit() throws Exception {
        //打開一個通道管理器
        selector = Selector.open();
        //擷取一個通信管道
        socketChannel = SocketChannel.open();
        //設定對應的發送位址和端口
        socketChannel.connect(new InetSocketAddress("localhost",9000));
        //設定非阻塞
        socketChannel.configureBlocking(false);
        //注冊一個寫入事件
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    //監聽伺服器傳回的資料
    private static void listen() throws Exception {
        Runnable runnable = () -> {
            while (true) {
                try {
                    //這裡會一直阻塞,直到事件過來
                    selector.select();
                    //在選擇器中擷取對應的注冊事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        //注冊事件
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        //判斷是否是讀事件
                        if (selectionKey.isReadable()) {
                            //擷取對應通信管道,并處理層資料
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //一次性讀取資料量,這裡應該做循環,我這裡友善沒有做
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            channel.read(byteBuffer);
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array()).trim());
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e.getMessage());
                }
            }
        };
        new Thread(runnable).start();
    }
}      

  2)服務端

package com.troy.nio.application;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server {

    //選擇器
    private static Selector selector;
    //服務端通信管道
    private static ServerSocketChannel channel;
    public static void main(String[] args) {
        try {
            serverInit();
            listen();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //初始化
    private static void serverInit() throws IOException {
        //打開一個選擇器
        selector = Selector.open();
        //打開一個服務端通信管道
        channel = ServerSocketChannel.open();
        //設定接收端口
        channel.socket().bind(new InetSocketAddress(9000));
        //設定非阻塞
        channel.configureBlocking(false);
        //注冊接收事件
        channel.register(selector, SelectionKey.OP_ACCEPT);
    }

    //監聽
    private static void listen() throws IOException {
        while (true) {
            //形成阻塞事件,接口完成後進行下一步
            selector.select();
            //擷取選擇器中的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                //判斷是否是接受事件
                if (selectionKey.isAcceptable()) {
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }
                //是否是可讀事件
                if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    //這裡的目的是當這個服務端一直存在,因為讀取資料存在異常,直接處理掉,下一個用戶端景來可以繼續接受
                    try {
                        socketChannel.read(byteBuffer);
                    } catch (Exception e) {
              selectionKey.cancel();
                        continue;
                    }
                    byteBuffer.flip();
                    System.out.println(new String(byteBuffer.array()).trim());
                    socketChannel.write(ByteBuffer.wrap("hello client!".getBytes()));
                }
            }
        }
    }
}