功能
1)編寫一個 NIO 群聊系統,實作伺服器端和用戶端之間的資料簡單通訊(非阻塞)
2)實作多人群聊
3)伺服器端:可以監測使用者上線,離線,并實作消息轉發功能
4)用戶端:通過channel 可以無阻塞發送消息給其它所有使用者,同時可以接受其它使用者發送的消息(由伺服器轉發得到)
基本知識
三種網絡I/O
BIO
同步阻塞 IO,Block IO,IO 操作時會阻塞線程,并發處理能力低。
Socket 程式設計就是 BIO,一個 socket 連接配接一個處理線程(這個線程負責這個 Socket 連接配接的一系列資料傳輸操作)。阻塞的原因在于:作業系統允許的線程數量是有限的,多個 socket 申請與服務端建立連接配接時,服務端不能提供相應數量的處理線程,沒有配置設定到處理線程的連接配接就會阻塞等待或被拒絕。
BIO方式适用于連接配接數目比較小且固定的架構,這種方式對伺服器資源要求比較高,并發局限于應用中,JDK1.4以前的唯一選擇,但程式簡單易了解。
NIO
同步非阻塞 IO,None-Block IO
NIO 是對 BIO 的改進,基于 Reactor 模型。我們知道,一個 socket 連接配接隻有在特定時候才會發生資料傳輸 IO 操作,大部分時間這個“資料通道”是空閑的,但還是占用着線程。
NIO 作出的改進就是“一個請求一個線程”,在連接配接到服務端的衆多 socket 中,隻有需要進行 IO 操作的才能擷取服務端的處理線程進行 IO。這樣就不會因為線程不夠用而限制了 socket 的接入
AIO
異步非阻塞 IO
這種 IO 模型是由作業系統先完成了用戶端請求處理再通知伺服器去啟動線程進行處理。AIO 也稱 NIO2.0,在 JDK7開始支援
AIO 引入異步通道的概念,采用了 Proactor 模式,簡化了程式編寫,有效的請求才啟動線程,它的特點是先由作業系統完成後才通知服務端程式啟動線程去處理,一般适用于連接配接數較多且連接配接時間較長的應用
NIO的三大核心元件
Channel(通道)
NIO的通道類似于流,但有些差別
- 通道可以同時進行讀寫,而流隻能讀或者隻能寫
- 通道可以實作異步讀寫資料
- 通道可以從緩沖讀資料,也可以寫資料到緩沖
- 通道是一個接口,有多種通道類型實作
Buffer(緩沖區)
緩沖區本質上是一個可以讀寫資料的記憶體塊,可以了解成是一個容器對象(含數組),該對象提供了一組方法,可以更輕松地使用記憶體塊,緩沖區對象内置了一些機制,能夠跟蹤和記錄緩沖區的狀态變化情況。Channel 提供從檔案、網絡讀取資料的管道,但是讀取或寫入的資料都必須經由 Buffer
Selector(選擇器)
用一個線程處理多個用戶端連接配接時會用到selector
Selector 能夠檢測多個注冊的通道上是否有事件發生(注意:多個Channel以事件的方式可以注冊到同一個Selector),如果有事件發生,便擷取事件然後針對每個事件進行相應的處理。這樣就可以隻用一個單線程去管理多個通道,也就是管理多個連接配接和請求
隻有在 連接配接/通道 真正有讀寫事件發生時,才會進行讀寫,就大大地減少了系統開銷,并且不必為每個連接配接都建立一個線程,不用去維護多個線程。避免了多線程之間的上下文切換導緻的開銷
NIO三大核心元件的關系

實作時的一些細節
資訊以byte形式高效傳送
不使用傳統的json等資料結構傳輸資訊,而是使用bytebuffer,每次發送一個bytebuffer
TCP沾包、半包問題
産生原因
- 應用程式寫入的資料大于套接字緩沖區大小,這将會發生拆包。
- 應用程式寫入資料小于套接字緩沖區大小,網卡将應用多次寫入的資料發送到網絡上,這将會發生粘包。
- 進行MSS(最大封包長度)大小的TCP分段,當TCP封包長度-TCP頭部長度>MSS的時候将發生拆包。
- 接收方法不及時讀取套接字緩沖區資料,這将發生粘包。
為什麼沾包、半包會導緻異常
将byte轉換回字元串時,會先讀取整個buffer,然後再使用byte數組生成字元串
- 如果隻接收一部分可能會導緻中間某一個字元隻接收了一部分byte,是以解碼的時候會産生異常。
- 如果接收多個bytebuffer,雖然不會産生異常,但如果中間有一些自定義的分隔符類的東西以标志不同用途會導緻解碼錯誤。
沾包半包問題共有三種情況
- 全沾包:多個完整的bytebuffer組成
- 半包:隻有一個完整的bytebuffer的一部分
- 全包+半包:有多個完整的bytebuffer和一個不完整的bytebuffer組成
沾包、半包問題的解決方案
TCP本身是面向流的,作為網絡伺服器,如何從這源源不斷湧來的資料流中拆分出或者合并出有意義的資訊呢?通常會有以下一些常用的方法:
- 發送端給每個資料包添加包首部,首部中應該至少包含資料包的長度,這樣接收端在接收到資料後,通過讀取包首部的長度字段,便知道每一個資料包的實際長度了。
- 發送端将每個資料包封裝為固定長度(不夠的可以通過補0填充),這樣接收端每次從接收緩沖區中讀取固定長度的資料就自然而然的把每個資料包拆分開來。(比較浪費空間)
- 可以在資料包之間設定邊界,如添加特殊符号,這樣,接收端通過這個邊界就可以将不同的資料包拆分開。(存在文字中包含要分隔的特殊符号,導緻分隔錯誤的情況)
server端
server端隻建立一個線程用以處理所有邏輯,實際上如果需要更高的效率可以添加不同的線程。
監聽
啟動server端後,server端開始監聽,監聽到請求則開始處理,處理完成後繼續監聽。
監聽的核心邏輯如下
// 開始監聽所有請求
public void listen(){
try{
// 循環周遊是否有事件
while (true){
// 阻塞0.1s看是否有用戶端連接配接,防止CPU全速空轉
if(this.selector.select(100)==0){
continue;
}
// 取得selectionkey,判斷通道裡的事件
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey selectedKey = iterator.next();
if(selectedKey.isAcceptable()){
// 如果是連接配接事件,則給用戶端生成一個channel
SocketChannel socketChannel = this.serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 向selector注冊
socketChannel.register(this.selector, SelectionKey.OP_READ);
// 已經登入完成,接下來調用登入處理邏輯
this.login(socketChannel);
}else if(selectedKey.isReadable()){
// 收到發送來的資料,啟動資料處理邏輯
readData(selectedKey);
}
iterator.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
資料從buffer中讀出
由于用戶端可能發送來的資料較大,而且我初始化的buffer讀取每次最多讀1024byte,是以會嘗試循環讀取直到不能讀出資料再結束
/**
* 讀取消息邏輯
* @param selectionKey
*/
private void readData(SelectionKey selectionKey){
try{
// 通過key反向得到channel
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
// 建立讀取資料的緩沖區
ByteBuffer byteBuffer = this.msgBuffer;
// 首先清空buffer
byteBuffer.clear();
// 讀取資料,并記錄資料大小
int count = socketChannel.read(byteBuffer);
if(count>0){
System.out.println("get msg: from"+socketChannel.getRemoteAddress().toString());
// 防止消息資訊太大出現需要多次讀取
while (count>0){
sendData2All(socketChannel, byteBuffer);
byteBuffer.clear();
count = socketChannel.read(byteBuffer);
}
}else if(count==-1){
this.logout((SocketChannel)selectionKey.channel());
}
}catch (Exception e){
try {
this.logout((SocketChannel)selectionKey.channel());
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
client端
client端共有兩個線程,一個線程用來監聽伺服器發來的資訊,一個線程用于從控制台讀取輸入。
處理沾包、半包問題
選擇在所有的資訊前面添加資料長度選項的方法為了處理沾包、半包、拆包帶來的問題。
發送
為了防止由于半包問題導緻表示資訊長度的int的4個byte被拆分成兩部分,并且拆包後包資料大小一般都為4的整數倍,是以我們對資訊進行處理,如果資料的byte數組長度不是4的倍數,則将byte數組長度補足為4的倍數。
/**
* 将資訊發送出去,為了防止沾包和半包的問題,會首先發送長度資訊,然後再發送具體資訊
* @param msgs
* @return 傳回資訊狀态,-1表示退出,0表示正常發送
* @throws IOException
*/
public int sendMsg(String msgs) throws IOException {
if("exit".equals(msgs)){
return -1;
}else{
String baseBlank = "aaaa"; // 先填充四個無意義字元,後續用于填充進去字元串長度
// 用來存儲資料的byte數組
byte[] msgBytes;
int flag;
// 如果有多行資料則分開發送
String[] splitedData = msgs.split("\n");
for (String msg : splitedData) {
msgBytes = (baseBlank + this.username + ":" + msg).getBytes();
// 用以保證所有發送的資訊長度都為4的倍數
flag = msgBytes.length % 4;
if(flag>0){
msgBytes = java.util.Arrays.copyOf(msgBytes, msgBytes.length + 4 - flag);
}
// 将消息長度資訊寫入消息體中
System.arraycopy(int2Bytes(msgBytes.length-4), 0, msgBytes, 0, 4);
// 發送
this.socketChannel.write(ByteBuffer.wrap(msgBytes));
}
return 0;
}
}
接收
為了處理半包、沾包問題,從buffer中讀取資料需要進行額外的處理,具體邏輯在代碼中都有注釋。
關鍵在于資訊長度的int資料的擷取,和沾包問題帶來的一個buffer讀多次的問題。
/**
* 讀取資料。讀取資料時需要處理沾包和半包的問題
* @param selectedKey
* @throws IOException
*/
public void readData(SelectionKey selectedKey) throws IOException {
SocketChannel channel = (SocketChannel)selectedKey.channel();
ByteBuffer byteBuffer = this.msgBuffer;
// 首先清空緩存
byteBuffer.clear();
// 首先從channel中讀取一部分資料到buffer
int count = channel.read(byteBuffer);
// 将buffer從寫狀态轉換為讀狀态
byteBuffer.flip();
// 首先擷取資訊的長度
int msgLen = byteBuffer.getInt();
int index = 0;
int startOffset = 4;
int getDataLen;
// 隻要還有資料就一直讀取,隻有在讀取完一個階段後才會執行其他任務
while (count>0 || msgLen!=0){ // 隻有讀完所有完整的資訊才算結束,出現半包則繼續等待
// 剛開始接收資訊要首先擷取int長度,共4個byte
if(index==0) {
index++;
getDataLen = Math.min(count - startOffset, msgLen);
byteArrayOutputStream.write(byteBuffer.array(), startOffset, getDataLen);
// 判斷目前是否讀完了一條資訊
if(getDataLen==msgLen){ // 已經讀完了一條資訊
// 判斷讀完這條資訊後是否把整個buffer讀完了,防止出現沾包的情況
if(msgLen==(count - startOffset)){ // 讀完這條資訊後把整個buffer讀完了,也就是沒有出現沾包的情況
byteBuffer.clear();
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 初始化資訊,防止之後還有連續的資訊需要讀取,但環境不正确的問題
index = 0;
startOffset = 4;
msgLen -= getDataLen;
// 如果之後還有資訊則繼續讀
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
if(count>0){
msgLen = byteBuffer.getInt();
}
}else{ // 出現了沾包
// 首先将目前資訊發送給所有用戶端
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 然後擷取下一條資訊的長度資訊,并開始繼續寫入
// 首先擷取新的一條資訊的開始偏移位置
byteBuffer.position(startOffset+getDataLen);
// 計算資料真正的開始位置
startOffset = startOffset+getDataLen+4;
// 擷取資訊長度
msgLen = byteBuffer.getInt();
index = 0;
if(startOffset==count){
index++;
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}else{ // 目前資訊還沒有讀完,繼續讀
byteBuffer.clear();
msgLen -= getDataLen;
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}else{
getDataLen = Math.min(count, msgLen);
byteArrayOutputStream.write(byteBuffer.array(), 0, getDataLen);
// 判斷目前是否讀完了一條資訊
if(getDataLen==msgLen){ // 已經讀完了一條資訊
// 判斷讀完這條資訊後是否把整個buffer讀完了,防止出現沾包的情況
if(msgLen==count){ // 讀完這條資訊後把整個buffer讀完了,也就是沒有出現沾包的情況
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 初始化資訊,防止之後還有連續的資訊需要讀取,但環境不正确的問題
index = 0;
startOffset = 4;
msgLen -= getDataLen;
// 如果之後還有資訊則繼續讀
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
if(count>0){
msgLen = byteBuffer.getInt();
}
}else{ // 出現了沾包
// 首先将目前資訊發送給所有用戶端
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 然後擷取下一條資訊的長度資訊,并開始繼續寫入
// 首先擷取新的一條資訊的開始偏移位置
byteBuffer.position(msgLen);
// 計算資料真正的開始位置
startOffset = msgLen+4;
// 擷取資訊長度
msgLen = byteBuffer.getInt();
index = 0;
if(startOffset==count){
index++;
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}else{ // 目前資訊還沒有讀完,繼續讀
byteBuffer.clear();
msgLen -= getDataLen;
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}
}
使用者名
由于我在server端僅僅實作了簡單的轉發功能,是以使用者名顯示的功能就落在了client端。
為了簡單起見,我在所有資訊發送前在資訊前面添加了目前使用者的使用者名,這樣可以保證所有client收發的消息都包含使用者名資訊
實作邏輯為
msg=username + ":" + msg
使用截圖
server
client1
client2
不足之處
- 對于消息發送一半client就退出的情況沒有做處理,應該添加一個逾時機制,如果超過多長時間沒有擷取到所有資料則放棄這條資料的接收。
- 代碼不夠規整。
參考
https://www.cnblogs.com/crazymakercircle/p/9941658.html
https://www.cnblogs.com/panchanggui/p/9518735.html
尚矽谷韓順平netty核心技術及源碼剖析
若有疑問或需要原始代碼請關注微信公衆号
完整代碼
server端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class ChatServer {
private int port = 2333; // 預設端口
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int baseBufferSize = 1024; // 預設buffer大小
private ByteBuffer oriAddr; // 用于轉發資訊來源位址
private ByteBuffer msgBuffer;
public ChatServer(int port){
this.port = port;
this.initEnv();
}
// 初始化伺服器
public void initEnv(){
try {
// 1. 取得一個serversocketchannel對象
this.serverSocketChannel = ServerSocketChannel.open();
// 2. 取得一個selector對象
this.selector = Selector.open();
// 3. 建立一個serversocket,并監聽2333端口
this.serverSocketChannel.socket().bind(new InetSocketAddress(this.port));
// 4. 設定非阻塞方式
this.serverSocketChannel.configureBlocking(false);
// 5. 将serversocketchannel對象注冊到selector,也就是将服務端綁定selector
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
// 6. 初始化原位址buffer
this.oriAddr = ByteBuffer.allocate(256);
this.msgBuffer = ByteBuffer.allocate(this.baseBufferSize);
// 輸出初始化成功資訊
System.out.println("init server success, the port is:"+ this.port);
}catch (Exception e){
e.printStackTrace();
}
}
// 開始監聽所有請求
public void listen(){
try{
// 循環周遊是否有事件
while (true){
// 阻塞0.1s看是否有用戶端連接配接,防止CPU全速空轉
if(this.selector.select(100)==0){
continue;
}
// 取得selectionkey,判斷通道裡的事件
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey selectedKey = iterator.next();
if(selectedKey.isAcceptable()){
// 如果是連接配接事件,則給用戶端生成一個channel
SocketChannel socketChannel = this.serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 向selector注冊
socketChannel.register(this.selector, SelectionKey.OP_READ);
// 已經登入完成,接下來調用登入處理邏輯
this.login(socketChannel);
}else if(selectedKey.isReadable()){
readData(selectedKey);
}
iterator.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 接收到登陸請求時的處理邏輯
* @param socketChannel
* @throws IOException
*/
public void login(SocketChannel socketChannel) throws IOException {
System.out.println(socketChannel.getRemoteAddress().toString() + "上線了");
// 将上限消息發送給所有用戶端,加鎖防止同時出現有用戶端登陸和退出的情況。但我這裡隻有一個線程處理消息是以不是必須的
synchronized (oriAddr){
oriAddr.clear();
byte[] loginBytes = ("server: " + socketChannel.getRemoteAddress().toString() + "上線了").getBytes();
int remain = loginBytes.length % 4;
if(remain>0){
loginBytes = java.util.Arrays.copyOf(loginBytes, loginBytes.length+4-remain);
}
oriAddr.putInt(loginBytes.length);
oriAddr.put(loginBytes);
sendData2All(socketChannel, oriAddr);
}
}
/**
* 接收到退出請求時的處理邏輯
* @param socketChannel
* @throws IOException
*/
public void logout(SocketChannel socketChannel) throws IOException {
System.out.println(socketChannel.getRemoteAddress().toString() + "下線了");
// 将上限消息發送給所有用戶端,加鎖防止同時出現有用戶端登陸和退出的情況。但我這裡隻有一個線程處理消息是以不是必須的
synchronized (oriAddr){
oriAddr.clear();
byte[] logoutBytes = ("server: " + socketChannel.getRemoteAddress().toString() + "下線了").getBytes();
int remain = logoutBytes.length % 4;
if(remain>0){
logoutBytes = java.util.Arrays.copyOf(logoutBytes, logoutBytes.length+4-remain);
}
oriAddr.putInt(logoutBytes.length);
oriAddr.put(logoutBytes);
sendData2All(socketChannel, oriAddr);
}
socketChannel.close();
}
/**
* 讀取消息邏輯
* @param selectionKey
*/
private void readData(SelectionKey selectionKey){
try{
// 通過key反向得到channel
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
// 建立讀取資料的緩沖區
ByteBuffer byteBuffer = this.msgBuffer;
// 首先清空buffer
byteBuffer.clear();
// 讀取資料,并記錄資料大小
int count = socketChannel.read(byteBuffer);
if(count>0){
System.out.println("get msg: from"+socketChannel.getRemoteAddress().toString());
// 防止消息資訊太大出現需要多次讀取
while (count>0){
sendData2All(socketChannel, byteBuffer);
byteBuffer.clear();
count = socketChannel.read(byteBuffer);
}
}else if(count==-1){
this.logout((SocketChannel)selectionKey.channel());
}
}catch (Exception e){
try {
this.logout((SocketChannel)selectionKey.channel());
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
/**
* 向所有用戶端發送消息,由于是中間方法,是以異常抛出
* @param socketChannel
* @param msg
* @throws IOException
*/
private void sendData2All(SocketChannel socketChannel, ByteBuffer msg) throws IOException {
msg.flip();
Iterator<SelectionKey> iterator = this.selector.keys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
SelectableChannel targetChannel = key.channel();
if((targetChannel instanceof SocketChannel) && !targetChannel.equals(socketChannel)){
// 直接通過buffer将資料轉發到其它用戶端
((SocketChannel)targetChannel).write(msg);
// 重置position,以友善下次使用buffer
msg.rewind();
}
}
}
/**
* 向單一用戶端發送消息
* @param targetChannel
* @param msg
* @throws IOException
*/
private void sendData2One(SocketChannel targetChannel, ByteBuffer msg) throws IOException {
if(targetChannel instanceof SocketChannel){
((SocketChannel)targetChannel).write(msg);
}
}
public static void main(String[] args) {
final int PORT = 2333;
ChatServer chatServer = new ChatServer(2333);
chatServer.listen();
}
}
client端
// todo 設定定時器,解決如果長時間沒有接收完畢,逾時放棄的問題
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Scanner;
public class ChatClient {
private String host = "127.0.0.1";
private int port = 2333;
private int baseBufferSize = 1024;
private int msgId = 1;
private Selector selector;
private SocketChannel socketChannel;
private HashMap<String, String> usernames;
private String username;
private ByteBuffer msgLenBuffer = ByteBuffer.allocate(4);
private ByteBuffer msgBuffer = ByteBuffer.allocate(baseBufferSize);
private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
public ChatClient(String name){
this.username = name;
initEnv();
}
public ChatClient(String serverIp, int serverPort, String name){
this.host = serverIp;
this.port = serverPort;
this.username = name;
initEnv();
}
public void initEnv(){
try {
// 初始化選擇器
this.selector = Selector.open();
// 連接配接遠端伺服器
this.socketChannel = SocketChannel.open(new InetSocketAddress(this.host, this.port));
// 設定為非阻塞模式
this.socketChannel.configureBlocking(false);
// 注冊選擇器,并設定為read模式
this.socketChannel.register(selector, SelectionKey.OP_READ);
// 初始化使用者名記錄系統
this.usernames = new HashMap<>();
// 計算目前用戶端的使用者名
usernames.put(host+":"+port, this.username);
System.out.println(this.username + " login success!");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 将資訊發送出去,為了防止沾包和半包的問題,會首先發送長度資訊,然後再發送具體資訊
* @param msgs
* @return 傳回資訊狀态,-1表示退出,0表示正常發送
* @throws IOException
*/
public int sendMsg(String msgs) throws IOException {
if("exit".equals(msgs)){
return -1;
}else{
String baseBlank = "aaaa"; // 先填充四個無意義字元,後續用于填充進去字元串長度
// 用來存儲資料的byte數組
byte[] msgBytes;
int flag;
// 如果有多行資料則分開發送
String[] splitedData = msgs.split("\n");
for (String msg : splitedData) {
msgBytes = (baseBlank + this.username + ":" + msg).getBytes();
// 用以保證所有發送的資訊長度都為4的倍數
flag = msgBytes.length % 4;
if(flag>0){
msgBytes = java.util.Arrays.copyOf(msgBytes, msgBytes.length + 4 - flag);
}
// 将消息長度資訊寫入消息體中
System.arraycopy(int2Bytes(msgBytes.length-4), 0, msgBytes, 0, 4);
// 發送
this.socketChannel.write(ByteBuffer.wrap(msgBytes));
}
return 0;
}
}
/**
* 将int轉為byte數組
* @param num
* @return
*/
public static byte[] int2Bytes(int num) {
byte[] bytes = new byte[4];
//通過移位運算,截取低8位的方式,将int儲存到byte數組
bytes[0] = (byte)(num >>> 24);
bytes[1] = (byte)(num >>> 16);
bytes[2] = (byte)(num >>> 8);
bytes[3] = (byte)num;
return bytes;
}
/**
* 執行讀取資訊操作
* @param channel
* @param msg
* @throws IOException
*/
public void readMsg(SocketChannel channel, ByteArrayOutputStream msg) throws IOException {
System.out.println(msg.toString().trim());
// 由于共用一個輸出流,是以在使用完畢後要清空
msg.reset();
}
/**
* 執行監聽服務端發來資訊操作
* @throws IOException
*/
public void listen() throws IOException {
while (!Thread.currentThread().isInterrupted()){
int readyChannelNum = this.selector.select();
if(readyChannelNum>0){
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey selectedKey = iterator.next();
if(selectedKey.isReadable()){
readData(selectedKey);
}
// 手動從疊代器中移除目前元素,防止重複處理
iterator.remove();
}
}
}
}
/**
* 讀取資料。讀取資料時需要處理沾包和半包的問題
* @param selectedKey
* @throws IOException
*/
public void readData(SelectionKey selectedKey) throws IOException {
SocketChannel channel = (SocketChannel)selectedKey.channel();
ByteBuffer byteBuffer = this.msgBuffer;
// 首先清空緩存
byteBuffer.clear();
// 首先從channel中讀取一部分資料到buffer
int count = channel.read(byteBuffer);
// 将buffer從寫狀态轉換為讀狀态
byteBuffer.flip();
// 首先擷取資訊的長度
int msgLen = byteBuffer.getInt();
int index = 0;
int startOffset = 4;
int getDataLen;
// 隻要還有資料就一直讀取,隻有在讀取完一個階段後才會執行其他任務
while (count>0 || msgLen!=0){ // 隻有讀完所有完整的資訊才算結束,出現半包則繼續等待
// 剛開始接收資訊要首先擷取int長度,共4個byte
if(index==0) {
index++;
getDataLen = Math.min(count - startOffset, msgLen);
byteArrayOutputStream.write(byteBuffer.array(), startOffset, getDataLen);
// 判斷目前是否讀完了一條資訊
if(getDataLen==msgLen){ // 已經讀完了一條資訊
// 判斷讀完這條資訊後是否把整個buffer讀完了,防止出現沾包的情況
if(msgLen==(count - startOffset)){ // 讀完這條資訊後把整個buffer讀完了,也就是沒有出現沾包的情況
byteBuffer.clear();
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 初始化資訊,防止之後還有連續的資訊需要讀取,但環境不正确的問題
index = 0;
startOffset = 4;
msgLen -= getDataLen;
// 如果之後還有資訊則繼續讀
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
if(count>0){
msgLen = byteBuffer.getInt();
}
}else{ // 出現了沾包
// 首先将目前資訊發送給所有用戶端
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 然後擷取下一條資訊的長度資訊,并開始繼續寫入
// 首先擷取新的一條資訊的開始偏移位置
byteBuffer.position(startOffset+getDataLen);
// 計算資料真正的開始位置
startOffset = startOffset+getDataLen+4;
// 擷取資訊長度
msgLen = byteBuffer.getInt();
index = 0;
if(startOffset==count){
index++;
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}else{ // 目前資訊還沒有讀完,繼續讀
byteBuffer.clear();
msgLen -= getDataLen;
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}else{
getDataLen = Math.min(count, msgLen);
byteArrayOutputStream.write(byteBuffer.array(), 0, getDataLen);
// 判斷目前是否讀完了一條資訊
if(getDataLen==msgLen){ // 已經讀完了一條資訊
// 判斷讀完這條資訊後是否把整個buffer讀完了,防止出現沾包的情況
if(msgLen==count){ // 讀完這條資訊後把整個buffer讀完了,也就是沒有出現沾包的情況
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 初始化資訊,防止之後還有連續的資訊需要讀取,但環境不正确的問題
index = 0;
startOffset = 4;
msgLen -= getDataLen;
// 如果之後還有資訊則繼續讀
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
if(count>0){
msgLen = byteBuffer.getInt();
}
}else{ // 出現了沾包
// 首先将目前資訊發送給所有用戶端
if(byteArrayOutputStream.size()>0)
this.readMsg(channel, byteArrayOutputStream);
// 然後擷取下一條資訊的長度資訊,并開始繼續寫入
// 首先擷取新的一條資訊的開始偏移位置
byteBuffer.position(msgLen);
// 計算資料真正的開始位置
startOffset = msgLen+4;
// 擷取資訊長度
msgLen = byteBuffer.getInt();
index = 0;
if(startOffset==count){
index++;
byteBuffer.clear();
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}else{ // 目前資訊還沒有讀完,繼續讀
byteBuffer.clear();
msgLen -= getDataLen;
count = channel.read(byteBuffer);
// 将position置為開始
byteBuffer.rewind();
}
}
}
}
// 退出程式
public void logout() throws IOException {
this.socketChannel.close();
this.selector.close();
}
public static void main(String[] args) {
ChatClient nowUser = new ChatClient("nowUser");
// 開啟線程監聽發來的請求
Thread listenThread = new Thread(() -> {
try {
nowUser.listen();
} catch (IOException e) {
e.printStackTrace();
}
});
listenThread.start();
// 模拟發送資料
// new Thread(()->{
// try {
// while (true){
// nowUser.sendMsg(nowUser.username);
// Thread.sleep(100);
// }
// } catch (IOException | InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// 從控制台擷取輸入并發送
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
try {
if(nowUser.sendMsg(msg)==-1){
// 中斷監聽線程
listenThread.interrupt();
// 循環等待監聽線程結束
while (!listenThread.isInterrupted());
// 執行退出程式
nowUser.logout();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}