é¦å æä»¬å 讨论ä¸ä¸nioåæä»¬å¤§å®¶ä¼æå¨ç¥çbioçåºå«
java.ioçæ ¸å¿æ¯æµï¼Streamï¼
ï¼æ¯é¢åæµçç¼ç¨ï¼ä¸ä¸ªæµåªè½è¿æè åº****ï¼æ²¡ææä¸¤ç§åè½é½æçæµï¼æ¢ä¸åå¨å¯ä»¥è¿èä¸å¯ä»¥åºçæµãèä¸java.ioæ¯é»å¡çï¼æä¸ä¸ªæ¯æ¹æï¼æä»¬ç°å¨çæå¡å¨è¦æ¥æ¶ä¸å¼ å¾çï¼è客æ·ç«¯åéçæ¶åå 为ç½ç»é®é¢ï¼ä¼ å°ä¸åå°±ä¼ ä¸å¨äºï¼èè¿æ¶æä»¬çæå¡å¨çreadæ¹æ³æ¯ä¸ç´é»å¡çå¾ å®¢æ·ç«¯ç»§ç»åéæ°æ®çï¼è¿ç§io模å䏿们éè¦ä¸ä¸ªè¯·æ±ä¸ä¸ªçº¿ç¨ï¼å½é¾æ¥å¤çæ¶åå°±éè¦ç»å¸¸çä¸ä¸æåæ¢,èä¸é¾æ¥å¤ªå¤å¯è½è¿ä¼åºç°å åå ç¨è¿å¤çç°è±¡ã
java.nio æ¯é¢åå(block) æè 说æ¯ç¼å²åº(buffer)çç¼ç¨,NIOæä¸ä¸ªåºæ¬æ¦å¿µSelector ï¼Channel ,Bufferã
Buffer,åºå±å ¶å®å°±æ¯æ°ç»,读åå齿¯bufferå®ç°çã
æ°æ®è¯»åæ°¸è¿ä¸ä¼åç°ç´æ¥è®¿é®Channelï¼å¿ é¡»éè¿bufferã
Selector ä¸è¬ç§° ä¸ºéæ©å¨ ï¼å½ç¶ä½ ä¹å¯ä»¥ç¿»è¯ä¸º å¤è·¯å¤ç¨å¨ ã宿¯Java NIOæ ¸å¿ç»ä»¶ä¸çä¸ä¸ªï¼ç¨äºæ£æ¥ä¸ä¸ªæå¤ä¸ªNIO Channelï¼ééï¼çç¶ææ¯å¦å¤äºå¯è¯»ãå¯åï¼çç¶æã妿¤å¯ä»¥å®ç°å线ç¨ç®¡çå¤ä¸ªchannels,ä¹å°±æ¯å¯ä»¥ç®¡çå¤ä¸ªç½ç»é¾æ¥ã
é¦å æä»¬è®¨è®ºä¸ä¸Buffer
ç´æ¥ä¸ä»£ç
public static void test1(){
//åé
é¿åº¦ä¸º128çintbuffer
IntBuffer intBuffer = IntBuffer.allocate(3);
//éæºçåIntBufferæ·»å æ°æ®
for (int i = 0; i < intBuffer.capacity(); i++) {
int i1 = new SecureRandom().nextInt(20);
intBuffer.put(i1);
}
//翻转
intBuffer.flip();
//读
while(intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
æä»¬å å®ä¹äºä¸ä¸ªIntBuffer ç¶åéæºçåIntBuffer æ·»å intç±»åæ°æ®ï¼ç¶å翻转ï¼ç¶åå°±å¼å§è¯»ã
å¾å¤å°ä¼ä¼´ä¼é®ï¼ä¸ºä»ä¹è¿è¦ç¿»è½¬ï¼æå¼å§è¯»ï¼æä¸ç¿»è½¬å¯ä»¥åï¼ä¼åçä»ä¹äºæ ã
让æä»¬è¿å ¥intBuffer.flip()è¿ä¸ªæ¹æ³ä¸æ¢ç©¶ç«æ
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
A bufferâs capacity is the number of elements it contains. The
capacity of a buffer is never negative and never changes.
A bufferâs limit is the index of the first element that should
not be read or written. A bufferâs limit is never negative and is never
greater than its capacity.
A bufferâs position is the index of the next element to be
read or written. A bufferâs position is never negative and is never
greater than its limit.
è¿æ¯java.nio.Bufferç±»ä¸é¢ç说æ
å¤§æ¦ææå°±æ¯
- capacity å°±æ¯ç¼å²åºç大å°ãåå§åçæ¶åä¹å°±æ¯æä»¬ä¹å设å®çIntBuffer.allocate(3)ï¼ä¹å°±æ¯3ã
-
limit
æå®è¿æå¤å°æ°æ®éè¦ååº(å¨ä»ç¼å²åºåå ¥ééæ¶)ï¼æè è¿æå¤å°ç©ºé´å¯ä»¥æ¾å ¥æ°æ®(å¨ä»ééè¯»å ¥ç¼å²åºæ¶)ãåå§åå®å®çäºcapacity
-
position
ä¸ä¸ä¸ªè¦è¢«è¯»æåçå ç´ çç´¢å¼ï¼æ°¸è¿ä¸ä¼å¤§äºlimitã
ä»ä»¬ç大å°å ³ç³»æ¯
0 <= position <= limit <= capacity
äºè§£äºè¿ä¸ä¸ªå±æ§çæä¹ä¹åæä»¬æ¥åæflip()è¿ä¸ªæ¹æ³
- é¦å æä»¬åbufferåæ°æ®,æ¯åä¸ä¸ªæ°æ®position ä¼+XX,capacityä¿æä¸å,åå®äºä¹åæä»¬è¦å¼å§è¯»,èpositionæåçæ¯ä¸ä¸ä¸ªè¦è¯»çæ°æ®
- å¦æè¿æ¶åæä»¬ç´æ¥å¼å§è¯»ï¼ä¸ä¸ä¸ªæ°æ®æ ¹æ¬å°±ä¸åå¨ãæä»¥æä»¬éè¦å°positionå½é¶
- å¨flip()é颿limit = positionæ¯ä¸ºäºæ è®°ï¼è¯»åæ°æ®çæ¶åçæåä¸ä½å¨åªé,è¿æ ·æå¥½å¤ææä»¬åbufferéé¢åäºå¤å°æ°æ®è¯»å°åªéçæ¶å忢
ç»è®º:è¯»å®æ°æ®è¦åæ°æ®ï¼æè åå®è¦è¯»ï¼ä¸é´å¿ é¡»è°ç¨flip()
Channel
ä¸ä»£ç
//å
public static void test3() {
try {
FileOutputStream fileOutputStream = new FileOutputStream("noiotest3.txt");
FileChannel channel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
byte[] msg = "hello nio !!!!".getBytes();
for (int i = 0; i < msg.length; i++) {
byteBuffer.put(msg[i]);
}
// å°ç¼ååèæ°ç»çæé设置为æ°ç»çå¼å§åºå峿°ç»ä¸æ 0
byteBuffer.flip();
channel.write(byteBuffer);
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//读
public static void test2() {
try {
FileInputStream fileInputStream = new FileInputStream("niotest1.txt");
FileChannel channel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
channel.read(byteBuffer);
byteBuffer.flip();
while (byteBuffer.hasRemaining()){
byte b = byteBuffer.get();
System.out.println("byte : "+(char)b);
}
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
ç´æ¥ä¸è¯»æä»¶ååæä»¶ç代ç ï¼å¼å¤´è¿æ¯æä»¬çæç FileOutputStream ï¼åªä¸è¿æä»¬ä»fileInputStream(fileOutputStream )éé¢è·åäºä¸ä¸ªééChannel,ç¶åå®ä¹äºä¸ä¸ªbuffer ,ç¶åè¿æ¯ flip(),ç¶åå¼å§è¯»æè åï¼æåå ³éãå¾ç®åçæä½ã
æä»¬åä¸ä¸ä¸ªä¾å
public static void test4() throws Exception{
FileInputStream inputStream = new FileInputStream("input.txt");
FileOutputStream outputStream = new FileOutputStream("output.txt");
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(2);
while (true){
//éç½®position =0 limit=capacity
//ä¸»è¦æ¯ä¸ºäºä¸ä¸æ¬¡å¾ªç¯
buffer.clear();
//è¿ééé¢ä¼è°ç¨buffer.put()
int len = inputChannel.read(buffer);
System.out.println(len);
if (-1 == len){
break;
}
//让positionçäº0ç¶å好å¼å§å
buffer.flip();
//è¿éä¼è®©limitåpositionä¸èµ·ç§»å¨
outputChannel.write(buffer);
}
inputChannel.close();
outputChannel.close();
}
è¿ä¸ªä¾åé颿¯ä¸é¢çç»åçï¼ä½æ¯å¤äºä¸ä¸ªbuffer.clear();
é¦å è¿æ¯è·åæä»¶çchannelï¼ç¶åå®ä¹bufferï¼ç¶åä¸ä¸ªå¾ªç¯,å¼å§è¯»ï¼å¦æè¯»å°æ°æ®çé¿åº¦çäº-1,å°±éåºå¾ªç¯ãæåè¿æ¯è¦å ³éã为ä»ä¹è¦clearï¼ï¼å¢?clearï¼ï¼ç©¶ç«æ¯èµ·äºä»ä¹ä½ç¨,å ¶å®ææ³¨éä¸é¢å·²ç»åç徿ç½äºï¼å®æ¯ä¸ºäºä¸ä¸æ¬¡å¾ªç¯ãæä»¬è¿å ¥è¿ä¸ªæ¹æ³
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
å®ä¹æ¯æä½äºpositionï¼limitï¼capacityè¿ä¸ä¸ªå±æ§ã宿positionï¼ålimité½åå§å尿们çbufferåååå»ºçæ¶åã
妿æä»¬ä¸buffer.clear();ä¼åçä»ä¹äºæ å¢ï¼
1ãç¬¬ä¸æ¬¡å¾ªç¯:
- è°ç¨inputChannel.read(buffer);æbufferåæ»¡
- buffer.flip();å¼å§outputChannel.write(buffer);æ£å¸¸çåæ°æ®ã
2ãç¬¬äºæ¬¡å¾ªç¯ï¼
- è°ç¨inputChannel.read(buffer);å 为position==limit ææä¸å¯è½åå¾bufferåäºï¼å 为positionä¸è½å¤§äºlimitï¼æä»¥æ°æ®è¿ä¸äºbuffer,
- ç¶åbuffer.flip() æä¹åçæ°æ®åå°æä»¶éé¢ã
è¿æ ·å°±é æäºä¸ä¸ªé®é¢ï¼æ°æ®æ°¸è¿è¯»ä¸å®ï¼å°±æäºæ»å¾ªç¯,æä»¥æä»¬å¾ªç¯è¯»æ°æ®çæ¶åä¸å®è¦è®°ä½buffer.clear()
ç°å¨æä»¬æ¥è®²ä¸ä¸ªnettyä¸ç´å¹çæ¦å¿µ:é¶æ·è´
public static void test1() throws Exception{
FileInputStream inputStream = new FileInputStream("input2.txt");
FileOutputStream outputStream = new FileOutputStream("output2.txt");
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocateDirect(2);
while (true){
//éç½®position =0 limit=capacity
//ä¸»è¦æ¯ä¸ºäºä¸ä¸æ¬¡å¾ªç¯èµ·ä½ç¨
buffer.clear();
//è¿ééé¢ä¼è°ç¨buffer.put()
int len = inputChannel.read(buffer);
System.out.println(len);
if (-1 == len){
break;
}
//让positionçäº0ç¶å好å¼å§å
buffer.flip();
//è¿éä¼è®©limitåpositionä¸èµ·ç§»å¨
outputChannel.write(buffer);
}
inputChannel.close();
outputChannel.close();
}
ç»å¿çåå¦å¯è½çå°äºå建bufferçæ¹å¼æ¹åäºï¼ä» allocateï¼2ï¼ åæäºallocateDirect(2),è¿æå¥åºå«å¢Directåé¢æææ¯ç´æ¥çï¼å¤§æ¦å°±æ¯åé ç´æ¥å åçbufferã
å ¶å®æ¯è¿ä¸¤ä¸ªå建bufferä½ç½®æä¸,allocateæ¯å¨javaå ä¸é¢å建ï¼å¯ä»¥ç解为å¨jvméé¢å建ç,èç´æ¥å¨å å¤å åä¸é¢å建bufferã
- allocateæ¯å¨JVMéé¢å建ç对象
- allocateDirectæ¯å¨æä½ç³»ç»çå åä¸é¢å建ç对象
为å¥è¦è¿æ ·å¢ï¼å 为jvmä¸è½ç´æ¥ä»æä½ç³»ç»çioç´æ¥è·å¾æ°æ®ä¼ è¾ééï¼éè¦å ææ°æ®ä»ç½ç»è¯»å°æä½ç³»ç»çå åå读å°JVMéé¢ï¼è¿æ ·å°±é æäºå åæ·è´ãå°éçæ°æ®å¯è½æ²¡å¥æè§ï¼ä½æ¯æ°æ®å¤äºè¯å®æ¯æå¾å¤§å½±åçã
è
allocateDirectæ¯å¨JVMå¤é¢çå åå建äºä¸ä¸ªbuffer,å¨
++java.nio.Buffer++é颿ä¸ä¸ª++long address++屿§æä»¬å¯ä»¥æè¿ä¸ªaddress屿§ç解为å®ç´æ¥æåäºå å¤å åçbuffer,ç´æ¥åå å¤å å交äºã
è¿æä¸ä¸ªé®é¢: 为ä»ä¹æä½ç³»ç»ä¸ç´æ¥è®¿é®javaå åï¼è¿æ ·å¤ç®ååï¼è¿è£ è¿ä¹å¤çX
æä»¬é½ç¥éJVMä¼èªå¨ç帮æä»¬ç®¡çå å,å°±æ¯å¦è¯´å代忶å§ï¼å®å年轻代ï¼å活代ï¼è年代ï¼gcçæ¶åä¼ç§»å¨å¯¹è±¡ï¼å åå°åä¼åï¼å¦æè¿æ¶ånativeæ£å¨æä½æ°æ®çæ¶ågcä¼ä¹±å¥
忥å 个ä¾å
å建åçbuffer
/** slice buffer ä¸ åæ¥çbufferå
±äº«åºå±æ°ç»
* @author guoyitao
* @date 2019/3/18 21:07
* @params
* @return
*/
public static void test1SliceBuffer(){
ByteBuffer buffer = ByteBuffer.allocate(10);
for (int i = 0; i <buffer.capacity() ; i++) {
buffer.put((byte) i);
}
buffer.position(2);
buffer.limit(6);
//åçï¼å¹¶ä¸æ¯å¤å¶åºæ¥ç
ByteBuffer slice = buffer.slice();
for (int i = 0; i < slice.capacity(); i++) {
byte b = slice.get(i);
b *=2;
slice.put(i, b);
}
buffer.clear();
while (buffer.hasRemaining()){
System.out.println(buffer.get());
}
}
å建åªè¯»buffer
/** å建åªè¯»buffer
* @author guoyitao
* @date 2019/3/18 21:22
* @params
* @return
*/
public static void testOnlyBuffer(){
ByteBuffer buffer = ByteBuffer.allocate(10);
for (int i = 0; i <buffer.capacity() ; i++) {
buffer.put((byte) i);
}
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(buffer.getClass());
System.out.println(readOnlyBuffer.getClass());
readOnlyBuffer.put("as".getBytes());
}
å ³äºNIOç½ç»ç¼ç¨
bufferçScatteringåGathering
public static void main(String[] args)throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8899);
serverSocketChannel.socket().bind(address);
int messagelen = 2 + 3 + 4;
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.allocate(2);
buffers[1] = ByteBuffer.allocate(3);
buffers[2] = ByteBuffer.allocate(4);
SocketChannel socketChannel = serverSocketChannel.accept();
while (true){
int bytesRead = 0;
while (bytesRead < messagelen){
//Scattering
long read = socketChannel.read(buffers);
bytesRead += read;
System.out.println("bytesread: " + bytesRead);
Arrays.asList(buffers).stream().map(
buffer -> "position:" +buffer.position() + ",limit: " + buffer.limit()).
forEach(System.out::println);
}
Arrays.asList(buffers).forEach(byteBuffer -> {
byteBuffer.flip();
});
long bytesWritten = 0;
while (bytesWritten < messagelen){
//Gathering
long l = socketChannel.write(buffers);
bytesWritten += l;
}
Arrays.asList(buffers).forEach(byteBuffer -> {
byteBuffer.clear();
});
System.out.println("bytesRead: "+ bytesRead + ",bytesWritten " + bytesWritten + "ï¼messageLength" + messagelen);
}
}
++å ³äºniosocketçæä»¬å ä¸ç++ï¼å åªçæ//Gathering//Scatteringçï¼ä¸¤ä¸ªå°æ¹
- Scattering ææ¥èªä¸ä¸ªchannelçæ°æ®è¯»å°å¤ä¸ªbufferï¼æç §é¡ºåºï¼è¯»æ»¡ç¬¬ä¸ä¸ªå第äºä¸ªããããããã
- Gathering æå¤ä¸ªbufferåå°channel,æç §é¡ºåºï¼åå®ç¬¬ä¸ä¸ªå第äºä¸ª
åä¸ä»£ç
public static void test1() throws IOException, InterruptedException {
int[] prots = new int[5];
prots[0] = 5000;
prots[1] = 5001;
prots[2] = 5002;
prots[3] = 5003;
prots[4] = 5004;
Selector selector = Selector.open();
// System.out.println(SelectorProvider.provider().getClass());
// System.out.println(sun.nio.ch.DefaultSelectorProvider.create().getClass());
for (int i = 0; i < prots.length; i++) {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket socket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress(prots[i]);
socket.bind(address);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("çå¬ç«¯å£" + prots[i]);
}
while (true){
int select = selector.select();
System.out.println("number :" + select);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys: " + selectionKeys);
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
//è¿æ¥äºä»¶
if (selectionKey.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
iterator.remove();
System.out.println("è·å¾å®¢æ·ç«¯è¿æ¥ï¼ " + socketChannel);
//读äºä»¶
} else if (selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int byteRead = 0;
while (true){
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
byteBuffer.clear();
int read = socketChannel.read(byteBuffer);
if (read <= 0){
break;
}
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteRead += read;
}
System.out.println("读åäºï¼" + byteRead + ",æ¥èª" + socketChannel);
iterator.remove();
}
}
}
}
ç°å¨æä»¬å¼å§è®²è§£nioç½ç»ç¼ç¨çhelloworld
selectedKeyå°Channelä¸Selector建ç«äºå ³ç³»,å¹¶ç»´æ¤äºchanneläºä»¶
å¯ä»¥éè¿cancelæ¹æ³åæ¶é®,åæ¶çé®ä¸ä¼ç«å³ä»selectorä¸ç§»é¤,èæ¯æ·»å å°cancelledKeysä¸,å¨ä¸ä¸æ¬¡selectæä½æ¶ç§»é¤å®ï¼æä»¥å¨è°ç¨æä¸ªkeyæ¶,æå¥½ä½¿ç¨isValidè¿è¡æ ¡éªã
ä¸äºäºä»¶ç±»åï¼
OP_ACCEPT:è¿æ¥å¯æ¥åæä½,ä» ServerSocketChannelæ¯æ
OP_CONNECT:è¿æ¥æä½,Clientç«¯æ¯æçä¸ç§æä½
OP_READ/OP_WRITE 读ååæ¶é´
ä¸äºæ¹æ³ä»ç»ï¼
- public abstract SelectableChannel channel():è¿åæ¤éæ©é®æå ³èçéé.å³ä½¿æ¤keyå·²ç»è¢«åæ¶,ä»ç¶ä¼è¿å.
- public abstract Selector selector():è¿åæ¤éæ©é®æå ³èçéæ©å¨,å³ä½¿æ¤é®å·²ç»è¢«åæ¶,ä»ç¶ä¼è¿å.
- public abstract boolean isValid():æ£æµæ¤keyæ¯å¦ææ.å½keyè¢«åæ¶,æè ééè¢«å ³é,æè selectorè¢«å ³é,é½å°å¯¼è´æ¤keyæ æ.å¨AbstractSelector.removeKey(key)ä¸,ä¼å¯¼è´selectionKeyè¢«ç½®ä¸ºæ æ.
- public abstract void cancel():请æ±å°æ¤é®åæ¶æ³¨å.䏿¦è¿åæå,é£ä¹è¯¥é®å°±æ¯æ æç,被添å å°selectorçcancelledKeysä¸.cancelæä½å°keyçvalid屿§ç½®ä¸ºfalse,å¹¶æ§è¡selector.cancel(key)(å³å°keyå å ¥cancelledkeyéå)
- public abstract int interesOps():è·å¾æ¤é®çintereséå.
- public abstract SelectionKey interestOps(int ops):å°æ¤é®çinterst设置为æå®å¼.æ¤æä½ä¼å¯¹opsåchannel.validOpsè¿è¡æ ¡éª.妿æ¤opsä¸ä¼å½åchannelæ¯æ,å°æåºå¼å¸¸.
- public abstract int readyOps():è·åæ¤é®ä¸readyæä½éå.å³å¨å½åééä¸å·²ç»å°±ç»ªçäºä»¶.
- public final boolean isReadable(): æ£æµæ¤é®æ¯å¦ä¸º"read"äºä»¶.çæäº:k.,readyOps() & OP_READ != 0;è¿æisWritable(),isConnectable(),isAcceptable()
- public final Object attach(Object ob):å°ç»å®ç对象ä½ä¸ºéä»¶æ·»å å°æ¤keyä¸.å¨keyæææé´,éä»¶å¯ä»¥å¨å¤ä¸ªopsäºä»¶ä¸ä¼ é.
- public final Object attachment():è·åéä»¶.ä¸ä¸ªchannelçéä»¶,å¯ä»¥åå½åChannel(æè 说æ¯SelectionKey)çå½å¨æä¸å ±äº«,使¯attachmentæ°æ®ä¸ä¼ä½ä¸ºsocketæ°æ®å¨ç½ç»ä¸ä¼ è¾.
- é¦å å®ä¹äºäºä¸ä¸ªprotséé¢åæ¾äºç«¯å£å·
- å建ä¸ä¸ªSelector
- 循ç¯çå建对个ServerSocketChannelï¼å¹¶å°configureBlockingï¼falseï¼è®°ä½è¿éå¿ é¡»æ¯false,设置为éé»å¡ã
- ç¶åå°åæå®ä¹ç端å£ç»å®å°socketéé¢,卿serverSocketChannel,注åå°Selectoréé¢
-
å¼å§ä¸ä¸ªæ»å¾ªç¯ï¼
æå¡å¨ä¸èµ·çå°±ä¼é»å¡å¨selector.select()ä¸ç´æ£æ¥ééï¼ç´å°ééçç¶æåéååï¼å¦æåéååä¼è¿åselectedKeysç个æ°
- è·å¾selectionKeys éåå®ï¼if夿selectionKeyæ¯ä»ä¹äºä»¶ï¼å¹¶ä½åºç¸åºçé»è¾
- è¿éå¿ é¡»éæ°æ³¨åå°selector,ç¶åæäºä»¶è®¾ç½®ä¸ºOP_ACCEPTä¹åçä¸ä¸ä¸ªæ³å¤ççäºä»¶,ç¶åiterator.remove()
è¦ç¥éï¼ä¸ç äºå½ä¸ç äºï¼channelæ¯æ³¨åå¨selectorä¸çï¼å¨åé¢ç轮询ä¸ï¼æ¯å å°å·²åå¤å¥½çchannelæéåºæ¥ï¼å³selector.select()ï¼åéè¿selectedKeys()çæçä¸ä¸ªSelectionKeyè¿ä»£å¨è¿è¡è½®è¯¢çï¼ä¸æ¬¡è½®è¯¢ä¼å°è¿ä¸ªè¿ä»£å¨ä¸çæ¯ä¸ªSelectionKeyé½éåä¸éï¼æ¯æ¬¡è®¿é®åé½remove()ç¸åºçSelectionKeyï¼ä½æ¯ç§»é¤äºselectedKeysä¸çSelectionKeyä¸ä»£è¡¨ç§»é¤äºselectorä¸çchannelä¿¡æ¯(è¿ç¹å¾éè¦)ï¼æ³¨åè¿çchannelä¿¡æ¯ä¼ä»¥SelectionKeyçå½¢å¼åå¨å¨selector.keys()ä¸ï¼ä¹å°±æ¯è¯´æ¯æ¬¡select()åçselectedKeysè¿ä»£å¨ä¸æ¯ä¸è½è¿ææåçï¼ä½keys()ä¸çæåæ¯ä¸ä¼è¢«å é¤ç(ä»¥æ¤æ¥è®°å½channelä¿¡æ¯)ã
é£ä¹ä¸ºä»ä¹è¦å é¤å¢ï¼è¦ç¥éï¼è¿ä»£å¨å¦æåªéè¦è®¿é®çè¯ï¼ç´æ¥è®¿é®å°±å¥½äºï¼å®å ¨æ²¡å¿ è¦remove()å ¶ä¸çå ç´ åï¼æ¥è¯¢äºç¸å ³èµæï¼ä¸è´çåçæ¯ä¸ºäºé²æ¢éå¤å¤çï¼åæ¥åæä¿¡æ¯è¯´æï¼æ¯æ¬¡å¾ªç¯è°ç¨remove()æ¯å 为selectorä¸ä¼èªå·±ä»å·²éæ©éåä¸ç§»é¤selectionKeyå®ä¾ï¼å¿ é¡»å¨å¤çå®ééæ¶èªå·±ç§»é¤ï¼è¿æ ·ï¼å¨ä¸æ¬¡selectæ¶ï¼ä¼å°è¿ä¸ªå°±ç»ªééæ·»å å°å·²éæ©éééåä¸ï¼å ¶å®å°è¿é就已ç»å¯ä»¥çè§£äºï¼selectorä¸ä¼èªå·±å é¤selectedKeys()éåä¸çselectionKeyï¼é£ä¹å¦æä¸äººå·¥remove()ï¼å°å¯¼è´ä¸æ¬¡select()çæ¶åselectedKeys()ä¸ä»æä¸æ¬¡è½®è¯¢ç䏿¥çä¿¡æ¯ï¼è¿æ ·å¿ ç¶ä¼åºç°é误ï¼åè®¾è¿æ¬¡è½®è¯¢æ¶è¯¥éé并没æåå¤å¥½ï¼å´åç±äºä¸æ¬¡è½®è¯¢æªè¢«remove()çåå 被认为已ç»åå¤å¥½äºï¼è¿æ ·è½ä¸åºéåï¼
å³selector.select()ä¼å°åå¤å¥½çchannel以SelectionKeyç形弿¾ç½®äºselectorçselectedKeys()ä¸ä¾ä½¿ç¨è è¿ä»£ï¼ä½¿ç¨çè¿ç¨ä¸éå°selectedKeysæ¸ ç©ºï¼è¿æ ·ä¸æ¬¡selector.select()æ¶å°±ä¸ä¼åºç°é误äºã
ä¸é¢åæ¥æåä¸ä¸ªdemo
è天客æ·ç«¯åæå¡ç«¯ 群èï¼ï¼
Server
/**
*
* remove SelectKeyæ¯ä¸ºäºé²æ¢ä¸ä¸æ¬¡selectçæ¶åéå¤å¤ç
* @Description: 群èæå¡å¨
* @Author: guo
* @CreateDate: 2019/3/19
* @UpdateUser:
*/
public class NIOServer {
public static final int serverPort = 5001;
public static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap<>();
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//éé»å¡
serverSocketChannel.configureBlocking(false);
//æå¡å¨æ¥å¥å
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(serverPort));
//注åå夿¥æ¶äºä»¶
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("æå¡å¨ç«¯å£ï¼ " + serverPort);
while (true){
//çå¾
äºä»¶
selector.select();
//è·åäºä»¶token
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
//读åäºä»¶
SelectionKey selectionKey = iterator.next();
//å¤çäºä»¶
if (selectionKey.isAcceptable() && selectionKey.isValid()){
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
//æ¥å䏿¤ééç奿¥å建ç«çè¿æ¥, è¿æ¯ç¨æ·è¿æ¥ï¼ï¼ï¼ï¼ï¼ï¼ï¼ï¼ï¼
SocketChannel client = server.accept();
//éé»å¡
client.configureBlocking(false);
//è¿æ¥æåå夿¥å读äºä»¶
client.register(selector,SelectionKey.OP_READ);
//ä¿åè¿æ¥ç¨æ·
clientMap.put(UUID.randomUUID().toString(),client);
iterator.remove();
}else if (selectionKey.isReadable()&& selectionKey.isValid()){
SocketChannel client = (SocketChannel) selectionKey.channel();
//æ¥åæ°æ®
try {
while (true){
ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
readBuffer.clear();
//æ¥èªåªä¸ªç¨æ·
String fromKey = null;
for (Map.Entry<String, SocketChannel> channelEntry : clientMap.entrySet()) {
if (channelEntry.getValue() == client){
fromKey = channelEntry.getKey();
break;
}
}
int read = 0;
try {
read = client.read(readBuffer);
} catch (IOException e) {
/*
* 客æ·ç«¯å¼å¸¸å
³éå¤ç java.io.IOException: è¿ç¨ä¸»æºå¼ºè¿«å
³éäºä¸ä¸ªç°æçè¿æ¥ã
* */
client.close();
selectionKey.cancel();
e.printStackTrace();
/*
* å¿
é¡»æclientMap忥çSocketChannelå餿,å ä¸ºåæ¥çSocketChannelå·²ç»å
³é
* 妿ä¸åé¤å¯è½æåºjava.nio.channels.ClosedChannelException
* */
clientMap.remove(fromKey);
break;
}
if (read == -1){
client.close();
selectionKey.cancel();
clientMap.remove(fromKey);
break;
}else if(read == 0){
break;
}
readBuffer.flip();
//转ç åå¼
Charset charset = Charset.forName("utf-8");
String receiveData = String.valueOf(charset.decode(readBuffer).array());
//åé
System.out.println("æå¡å¨æ¶å°" + receiveData + "ï¼æ¥èª:" + client);
for (Map.Entry<String, SocketChannel> channelEntry : clientMap.entrySet()) {
SocketChannel value = channelEntry.getValue();
ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
writeBuffer.put((fromKey + receiveData).getBytes());
writeBuffer.flip();
value.write(writeBuffer);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
Client
/**
* @Description: 群è客æ·ç«¯
* @Author: guo
* @CreateDate: 2019/3/19
* @UpdateUser:
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//å建ä¸ä¸ªSocketChannel
SocketChannel socketChannel = SocketChannel.open();
//éé»å¡
socketChannel.configureBlocking(false);
//åå»ºéæ©å¨
Selector selector = Selector.open();
//注å 设置äºä»¶ä¸º 龿¥
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//龿¥å°æå¡å¨
socketChannel.connect(new InetSocketAddress("127.0.0.1",NIOServer.serverPort));
while (true){
//çå¾
äºä»¶
selector.select();
//è·åäºä»¶token
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//读åäºä»¶
SelectionKey selectionKey = iterator.next();
//å¤çäºä»¶ 夿æ¯ä¸æ¯OP_CONNECTäºä»¶
if (selectionKey.isConnectable()) {
//è·å¾SocketChannel
SocketChannel client = (SocketChannel) selectionKey.channel();
// 建ç«è¿æ¥
if (client.isConnectionPending()) {
//宿龿¥
client.finishConnect();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((LocalDateTime.now() + "è¿æ¥æå").getBytes());
writeBuffer.flip();
//åé龿¥æåä¿¡æ¯
client.write(writeBuffer);
//å建ä¸ä¸ªå个线ç¨ççº¿ç¨æ±
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(() -> {
while (true) {
try {
//è¿éä¹å讲è¿
writeBuffer.clear();
//æ åè¾å
¥ï¼ä»é®çè¾å
¥
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = reader.readLine();
writeBuffer.put(line.getBytes());
writeBuffer.flip();
//ä»å®¢æ·ç«¯åæå¡ç«¯åéæ°æ®
client.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
//æ¶æ°æ®
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = client.read(byteBuffer);
if (read > 0) {
String receiveData = new String(byteBuffer.array(),0,read);
System.out.println("æ¶å°æ¶æ¯: " + receiveData);
}
}
}
//ä¹å讲è¿
selectionKeys.clear();
}
}
}