天天看點

java nio socket,Java----NioSocket的簡單使用

一、自己了解的概念

nioSocket(即new io socket)是一種同步非阻塞的I/O,其使用buffer來緩存資料和channel來傳輸資料,使用select來分揀消息。其使用的ServerSocketChannel和SocketChannel對應于之前學習的ServerSocket和Socket。

在我看的書(《看透Spring MVC源代碼分析與實踐》)中形容為:當今的快遞物流,現實中的快遞站點不會一件一件地發出快遞,而是在每一天的中午或者是一天的下午等時間即等待一段時間後将受到的快件一起發出去,這就像這裡要提到的NioSocket的工作原理,送貨員就像這裡的channel,将一段時間的一批貨一起送出去,這一批快件就針對于buffer,到一個中轉點分揀員分揀,将不同的貨分發給相應的地區相應的快遞員,分揀員的身份就相當于selector。

二、與普通socket的差別:

資料傳輸方面:

socket是直接使用輸入輸出流的方式直接讀,當然也可以選擇性的放在緩沖資料區中。

nioSocket隻能用buffer來進行資訊的擷取與發出!

異步特性上面

socket 在連接配接和讀寫是都是堵塞狀态的,即所線上程必須停住等待連接配接和資料的讀入及寫出,如果想在通信的時候可以開辟線程,一個線程對應一個socket連接配接,這樣不僅資源消耗太高,而且線程安全問題也不容小觑!

niosocket可以再連接配接、讀入、寫出等待時執行其他代碼,就像上面的例子中,快遞站點的從業人員不是一直有事,可以再等待快遞的時間裡幹别的事。

三、代碼實作

伺服器端

package AboutNioSocket.SimpleNioSocket;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.Iterator;

import java.util.Set;

public class NioSocketServer {

public static void main(String[] args) throws IOException, InterruptedException {

new NioSocketServer().start();

}

private void start() throws IOException, InterruptedException {

//使用靜态open方法,生成一個ServerSocketChannel對象

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//設定其為非阻塞模式

serverSocketChannel.configureBlocking(false);

//擷取有個selector對象

Selector selector = Selector.open();

//注冊selector,第二個參數設定了其操作類型

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//綁定接受請求的端口

serverSocketChannel.bind(new InetSocketAddress(8001));

//循環接受請求

while (true) {

//等待三秒,如果沒有請求就select方法傳回0,運作else中的需要一部運作的代碼

// 如果參數是0或沒有參數的話就一直阻塞直到接收到請求

if (selector.select(3000) != 0) {

//selectedKeys方法擷取SelectionKey的集合

//SelectionKey儲存了請求的channel和selector資訊

Set keys = selector.selectedKeys();

System.out.println("keys.length is " + keys.size());

Iterator iterator = keys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

//擷取後移除這個已經處理的請求!

iterator.remove();

//如果該key所在的channel或者selector關閉了,這裡就會傳回true

//如果是接收請求操作

if (key.isAcceptable()) {

accept(key);

//如果是寫操作

} else if (key.isWritable()) {//

write(key);

//如果是讀取操作

} else if (key.isReadable()) {

read(key);

}

}

} else {

System.out.println();

Thread.sleep(800);

}

}

}

//nioSocket是通過緩流進行讀寫操作的,這裡先初始化好讀寫的緩沖流!

private ByteBuffer read = ByteBuffer.allocate(1024);

private ByteBuffer write = ByteBuffer.allocate(1024);

//這是接收到的字元串

private String getStr;

private void accept(SelectionKey key) throws IOException {

//這裡的socket還沒有注冊用戶端的channel,是以channel方法是擷取ServerSocketChannel的

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

//而ServerSocketChannel的accept方法才是擷取連接配接的channel的!

SocketChannel socketChannel = serverSocketChannel.accept();

//擷取selector用的就是selector()方法

Selector selector = key.selector();

//也給其設定非阻塞模式

socketChannel.configureBlocking(false);

//注冊伺服器端的socket!本地分揀員能為用戶端的channel服務了!

socketChannel.register(selector, SelectionKey.OP_READ);

System.out.println("connect successfully");

}

private void write(SelectionKey key) throws IOException {

write.clear();

write.put(getStr.getBytes());

write.flip();

// selector已經注冊了用戶端的channel

// channel()方法擷取到的是發送請求的SocketChannel對象

SocketChannel channel = (SocketChannel) key.channel();

channel.configureBlocking(false);

channel.write(write);

Selector selector = key.selector();

//更換下一步的操作類型

channel.register(selector, SelectionKey.OP_READ);

}

private void read(SelectionKey key) throws IOException {

read.clear();

SocketChannel channel = (SocketChannel) key.channel();

channel.configureBlocking(false);

int num;

if ((num = channel.read(read)) == -1) {

System.out.println("未讀到資訊");

} else {

Selector selector = key.selector();

channel.register(selector, SelectionKey.OP_WRITE);

getStr = new String(read.array(), 0, num);

}

}

}

用戶端

package AboutNioSocket.SimpleNioSocket;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.Iterator;

import java.util.Set;

public class NioSocketServer {

public static void main(String[] args) throws IOException, InterruptedException {

new NioSocketServer().start();

}

private void start() throws IOException, InterruptedException {

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

Selector selector = Selector.open();

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

serverSocketChannel.bind(new InetSocketAddress(8001));

while (true) {

if (selector.select(3000) != 0) {

Set keys = selector.selectedKeys();

System.out.println("keys.length is " + keys.size());

Iterator iterator = keys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

//如果該key所在的channel或者selector關閉了,這裡就會傳回true

if (key.isAcceptable()) {

accept(key);

} else if (key.isWritable()) {

write(key);

} else if (key.isReadable()) {

read(key);

}

}

} else {

System.out.println();

Thread.sleep(800);

}

}

}

private ByteBuffer read = ByteBuffer.allocate(1024);

private ByteBuffer write = ByteBuffer.allocate(1024);

private String getStr;

private void accept(SelectionKey key) throws IOException {

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

SocketChannel socketChannel = serverSocketChannel.accept();

Selector selector = key.selector();

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

System.out.println("connect successfully");

}

private void write(SelectionKey key) throws IOException {

write.clear();

write.put(getStr.getBytes());

write.flip();

SocketChannel channel = (SocketChannel) key.channel();

channel.configureBlocking(false);

channel.write(write);

Selector selector = key.selector();

channel.register(selector, SelectionKey.OP_READ);

}

private void read(SelectionKey key) throws IOException {

read.clear();

SocketChannel channel = (SocketChannel) key.channel();

channel.configureBlocking(false);

int num;

if ((num = channel.read(read)) == -1) {

System.out.println("未讀到資訊");

} else {

Selector selector = key.selector();

channel.register(selector, SelectionKey.OP_WRITE);

getStr = new String(read.array(), 0, num);

}

}

}

四、代碼流程解析

以伺服器端為例(用戶端原理差不多)

首先我們可以通過ServerSocketChannle的靜态open方法産生一個ServerSocketChannel對象(其對應于一個ServerSocket對象,可以使用其socket()方法産生,然後運用這個ServerSocket對象進行監聽,那還就是使用原來的socket了);有了ServerSocketChannel對象,使用configureBlocking(false)設定其為非阻塞模式,這樣就可以異步處理其他的代碼,可以在後面調用register方法注冊Selector了。注冊時會給其設定四種操作方法之一:

SelectionKey.OP_ACCEPT

SelectionKey.OP_CONNECT

SelectionKey.OP_WRITE

SelectionKey.OP_READ

代碼很簡單,基本的操作都寫到注釋中去了,這裡不做詳細說明

要知道的是Selector與Channel沒有主屬關系,沒有誰屬于誰的關系,一個Selector可以注冊多個Chennel,一個Channel能注冊多個Selector。Selector像快遞分揀員一樣能處理來自于不同地方的物流請求,Selector相比之下分的更細,他可以按照不同的類型分揀。分揀後的結果儲存在SelectionKey中,可以通過SelectionKey中的channel方法和selector方法擷取相應的channel和selector,通過isAcctable()、isConnectable()、isWritable()、isReadable()判斷請求屬于什麼操作!

五、Buffer緩沖流

我們可以看到每次進行寫操作時會調用clear(),讀取時使用flip()方法,兩個方法的具體操作在注釋中已經寫明,但這四個屬性都有什麼含義呢?

答:capacity:容量,在初始化時已經設定好,使用過程中不能改變!

limit:可以使用的範圍的上限,開始時預設和capacity的值相等。當我們寫入了一個字元串時,其大小就和該字元串的位元組數相等,我們最多能操作的索引上限也就是這個數值!

position:目前操作的索引值,從0開始,随着get和put方法的操作改變。

mark:暫時儲存position的位置,比如目前的position的值為10,我想現在通路15-20之間的内容,可以使用mark()方法儲存目前的position的值,然後通過position(int pos)方法設定position的值為15,通路完後調用reset()方法使position = mark!需要注意的是如果将position的值設定得比mark小,mark就會被還原為預設值-1.

四個屬性的大小:mark <= position <= limit <= capacity

後續更新!