天天看點

Java 并發程式設計-NIO 簡明教程

問題來源

在傳統的架構中,對于用戶端的每一次請求,伺服器都會建立一個新的線程或者利用線程池複用去處理使用者的一個請求,然後傳回給使用者結果,這樣做在高并發的情況下會存在非常嚴重的性能問題:對于使用者的每一次請求都建立一個新的線程是需要一定記憶體的,同時線程之間頻繁的上下文切換也是一個很大的開銷。

p.s: 本文涉及的完整執行個體代碼都可以在我的GitHub上面下載下傳。

什麼是Selector

NIO的核心就是Selector,讀懂了Selector就了解了異步機制的實作原理,下面先來簡單的介紹一下什麼是Selector。現在對于用戶端的每一次請求到來時我們不再立即建立一個線程進行處理,相反以epool為例子當一個事件準備就緒之後通過回調機制将描述符加入到阻塞隊列中,下面隻需要通過周遊阻塞隊列對相應的事件進行處理就行了,通過這種回調機制整個過程都不需要對于每一個請求都去建立一個線程去單獨處理。上面的解釋還是有些抽象,下面我會通過具體的代碼執行個體來解釋,在這之前我們先來了解一下NIO中兩個基礎概念Buffer和Channel。

如果大家對于多路IO複用比如select/epool完全陌生的話,建議先讀一下我的這篇Linux下的五種IO模型 :-)

Buffer

以ByteBuffer為例子,我們可以通過ByteBuffer.allocate(n)來配置設定n個位元組的緩沖區,對于緩沖區有四個重要的屬性:

  1. capacity,緩沖區的容量,也就是我們上面指定的n。
  2. position,目前指針指向的位置。
  3. mark,前一個位置,這裡我們下面再解釋。
  4. limit,最大能讀取或者寫入的位置。

如上圖所示,Buffer實際上也是分為兩種,一種用于寫資料,一種用于讀取資料。

put

通過直接閱讀ByteBuffer源碼可以清晰看出put方法是把一個byte變量x放到緩沖區中去,同時position加1:

1

2

3

4

5

6

7

8

9

public

ByteBuffer put(

byte

x) {

hb[ix(nextPutIndex())] = x;

return

this

;

}

final

int

nextPutIndex() {

if

(position >= limit)

throw

new

BufferOverflowException();

return

position++;

}

get

get方法是從緩沖區中讀取一個位元組,同時position加一:

public

byte

get() {

return

hb[ix(nextGetIndex())];

}

final

int

nextGetIndex() {

if

(position >= limit)

throw

new

BufferUnderflowException();

return

position++;

}

flip

如果我們想将buffer從寫資料的情況變成讀資料的情況,可以直接使用flip方法:

public

final

Buffer flip() {

limit = position;

position =

;

mark = -

1

;

return

this

;

}

mark和reset

mark是記住目前的位置用的,也就是儲存position的值:

public

final

Buffer mark() {

mark = position;

return

this

;

}

如果我們在對緩沖區讀寫之前就調用了mark方法,那麼以後當position位置變化之後,想回到之前的位置可以調用reset會将mark的值重新賦給position:

public

final

Buffer reset() {

int

m = mark;

if

(m <

)

throw

new

InvalidMarkException();

position = m;

return

this

;

}

Channel

利用NIO,當我們讀取資料的時候,會先從buffer加載到channel,而寫入資料的時候,會先入到channel然後通過channel轉移到buffer中去。channel給我們提供了兩個方法:通過channel.read(buffer)可以将channel中的資料寫入到buffer中,而通過channel.write(buffer)則可以将buffer中的資料寫入到到channel中。

Channel的話分為四種:

  1. FileChannel從檔案中讀寫資料。
  2. DatagramChannel以UDP的形式從網絡中讀寫資料。
  3. SocketChannel以TCP的形式從網絡中讀寫資料。
  4. ServerSocketChannel允許你監聽TCP連接配接。

因為今天我們的重點是Selector,是以來看一下SocketChannel的用法。在下面的代碼利用SocketChannel模拟了一個簡單的server-client程式。

WebServer的代碼如下,和傳統的sock程式并沒有太多的差異,隻是我們引入了buffer和channel的概念:

10

11

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.socket().bind(

new

InetSocketAddress(

"127.0.0.1"

,

5000

));

SocketChannel socketChannel = ssc.accept();

ByteBuffer readBuffer = ByteBuffer.allocate(

128

);

socketChannel.read(readBuffer);

readBuffer.flip();

while

(readBuffer.hasRemaining()) {

System.out.println((

char

)readBuffer.get());

}

socketChannel.close();

ssc.close();

WebClient的代碼如下:

SocketChannel socketChannel =

null

;

socketChannel = SocketChannel.open();

socketChannel.connect(

new

InetSocketAddress(

"127.0.0.1"

,

5000

));

ByteBuffer writeBuffer = ByteBuffer.allocate(

128

);

writeBuffer.put(

"hello world"

.getBytes());

writeBuffer.flip();

socketChannel.write(writeBuffer);

socketChannel.close();

Scatter / Gather

在上面的client程式中,我們也可以同時将多個buffer中的資料放入到一個數組後然後統一放入到channel後傳遞給伺服器:

ByteBuffer buffer1 = ByteBuffer.allocate(

128

);

ByteBuffer buffer2 = ByteBuffer.allocate(

16

);

buffer1.put(

"hello "

.getBytes());

buffer2.put(

"world"

.getBytes());

buffer1.flip();

buffer2.flip();

ByteBuffer[] bufferArray = {buffer1, buffer2};

socketChannel.write(bufferArray);

Selector

通過使用selector,我們可以通過一個線程來同時管理多個channel,省去了建立線程以及線程之間進行上下文切換的開銷。

建立一個selector

通過調用selector類的靜态方法open我們就可以建立一個selector對象:

Selector selector = Selector.open();

注冊channel

為了保證selector能夠監聽多個channel,我們需要将channel注冊到selector當中:

channel.configureBlocking(

false

);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

我們可以監聽四種事件:

  1. SelectionKey.OP_CONNECT:當用戶端的嘗試連接配接到伺服器
  2. SelectionKey.OP_ACCEPT:當伺服器接受來自用戶端的請求
  3. SelectionKey.OP_READ:當伺服器可以從channel中讀取資料
  4. SelectionKey.OP_WRITE:當伺服器可以向channel中寫入資料

對SelectorKey調用channel方法可以得到key對應的channel:

Channel channel = key.channel();

而key自身感興趣的監聽事件也可以通過interestOps來獲得:

int

interestSet = selectionKey.interestOps();

對selector調用selectedKeys()方法我們可以得到注冊的所有key:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

實戰

伺服器的代碼如下:

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.socket().bind(

new

InetSocketAddress(

"127.0.0.1"

,

5000

));

ssc.configureBlocking(

false

);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer readBuff = ByteBuffer.allocate(

128

);

ByteBuffer writeBuff = ByteBuffer.allocate(

128

);

writeBuff.put(

"received"

.getBytes());

writeBuff.flip();

// make buffer ready for reading

while

(

true

) {

selector.select();

Set<SelectionKey> keys = selector.selectedKeys();

Iterator<SelectionKey> it = keys.iterator();

while

(it.hasNext()) {

SelectionKey key = it.next();

it.remove();

if

(key.isAcceptable()) {

SocketChannel socketChannel = ssc.accept();

socketChannel.configureBlocking(

false

);

socketChannel.register(selector, SelectionKey.OP_READ);

}

else

if

(key.isReadable()) {

SocketChannel socketChannel = (SocketChannel) key.channel();

readBuff.clear();

// make buffer ready for writing

socketChannel.read(readBuff);

readBuff.flip();

// make buffer ready for reading

System.out.println(

new

String(readBuff.array()));

key.interestOps(SelectionKey.OP_WRITE);

}

else

if

(key.isWritable()) {

writeBuff.rewind();

// sets the position back to 0

SocketChannel socketChannel = (SocketChannel) key.channel();

socketChannel.write(writeBuff);

key.interestOps(SelectionKey.OP_READ);

}

}

}

用戶端程式的代碼如下,各位讀者可以同時在終端下面多開幾個程式來同時模拟多個請求,而對于多個用戶端的程式我們的伺服器始終隻用一個線程來處理多個請求。一個很常見的應用場景就是多個使用者同時往伺服器上傳檔案,對于每一個上傳請求我們不在單獨去建立一個線程去處理,同時利用Executor/Future我們也可以不用阻塞在IO操作中而是立即傳回使用者結果。

SocketChannel socketChannel = SocketChannel.open();

socketChannel.connect(

new

InetSocketAddress(

"127.0.0.1"

,

5000

));

ByteBuffer writeBuffer = ByteBuffer.allocate(

32

);

ByteBuffer readBuffer = ByteBuffer.allocate(

32

);

writeBuffer.put(

"hello"

.getBytes());

writeBuffer.flip();

// make buffer ready for reading

while

(

true

) {

writeBuffer.rewind();

// sets the position back to 0

socketChannel.write(writeBuffer);

// hello

readBuffer.clear();

// make buffer ready for writing

socketChannel.read(readBuffer);

// recieved

}

熬夜不易,點選請老王喝杯烈酒!!!!!!!