天天看點

java nio消息半包、粘包解決方案

java nio消息半包、粘包解決方案

問題背景

NIO是面向緩沖區進行通信的,不是面向流的。我們都知道,既然是緩沖區,那它一定存在一個固定大小。這樣一來通常會遇到兩個問題:

消息粘包:當緩沖區足夠大,由于網絡不穩定種種原因,可能會有多條消息從通道讀入緩沖區,此時如果無法厘清資料包之間的界限,就會導緻粘包問題;

消息不完整:若消息沒有接收完,緩沖區就被填滿了,會導緻從緩沖區取出的消息不完整,即半包的現象。

介紹這個問題之前,務必要提一下我代碼整體架構。

代碼參見GitHub倉庫

https://github.com/CuriousLei/smyl-im

在這個項目中,我的NIO核心庫設計思路流程圖如下所示

介紹:

服務端為每一個連接配接上的用戶端建立一個Connector對象,為其提供IO服務;

ioArgs對象内部執行個體域引用了緩沖區buffer,作為直接與channel進行資料互動的緩沖區;

兩個線程池,分别操控ioArgs進行讀和寫操作;

connector與ioArgs關系:(1)輸入,線程池處理讀事件,資料寫入ioArgs,并回調給connector;(2)輸出,connector将資料寫入ioArgs,将ioArgs傳入Runnable對象,供線程池處理;

兩個selector線程,分别監聽channel的讀和寫事件。事件就緒,則觸發線程池工作。

思路

光這樣實作,必然會有粘包、半包問題。要重制這兩個問題也很簡單。

ioArgs中把緩沖區設定小一點,發送一條大于該長度的資料,服務端會當成兩條消息讀取,即消息不完整;

線上程代碼中,加一個Thread.sleep()延時等待,用戶端連續發幾條消息(總長度小于緩沖區大小),也可以重制粘包現象。

這個問題實質上是消息體與緩沖區資料不一一對應導緻的。那麼,如何解決呢?

固定頭部方案

可以采用固定頭部方案來解決,頭部設定四個位元組,存儲一個int值,記錄後面資料的長度。以此來标記一個消息體。

讀取資料時,根據頭部的長度資訊,按序讀取ioArgs緩沖區中的資料,若沒有達到長度要求,繼續讀下一個ioArgs。這樣自然不會出現粘包、半包問題。

輸出資料時,也采用同樣的機制封裝資料,首部四個位元組記錄長度。

我的工程項目中,用戶端和服務端共用一個nio核心包,即niohdl,可保證收發資料格式一緻。

設計方案

要實作以上設想,必須在connector和ioArgs之間加一層Dispatcher類,用于處理消息體與緩沖區之間的轉化關系(消息體取個名字:Packet)。根據輸入和輸出的不同,分别叫ReceiveDispatcher和SendDispatcher。即通過它們來操作Packet與ioArgs之間的轉化。

Packet

定義這個消息體,繼承關系如下圖所示:

Packet是基類,代碼如下:

package cn.buptleida.niohdl.core;

import java.io.Closeable;

import java.io.IOException;

/**

  • 公共的資料封裝
  • 提供了類型以及基本的長度的定義

    */

public class Packet implements Closeable {

protected byte type;
protected int length;

public byte type(){
    return type;
}

public int length(){
    return length;
}

@Override
public void close() throws IOException {

}           

}

SendPacket和ReceivePacket分别代表發送消息體和接收消息體。StringReceivePacket和StringSendPacket代表字元串類的消息,因為本次實踐隻限于字元串消息的收發,今後可能有檔案之類的,有待擴充。

代碼中必然會涉及到位元組數組的操作,是以,以StringSendPacket為例,需要提供将String轉化為byte[]的方法。代碼如下所示:

package cn.buptleida.niohdl.box;

import cn.buptleida.niohdl.core.SendPacket;

public class StringSendPacket extends SendPacket {

private final byte[] bytes;

public StringSendPacket(String msg) {
    this.bytes = msg.getBytes();
    this.length = bytes.length;//父類中的執行個體域
}

@Override
public byte[] bytes() {
    return bytes;
}           

SendDispatcher

在connector對象的執行個體域中會引用一個SendDispatcher對象。發送資料時,會通過SendDispatcher中的方法對資料進行封裝和處理。其大緻的關系圖如下所示:

SendDispatcher中設定任務隊列Queue queue,需要發送消息時,connector将消息寫入sendPacket,并存入隊列queue,執行出隊。用packetTemp變量引用出隊的元素,将四位元組的長度資訊和packetTemp寫入ioArgs的緩沖區中,發送完畢之後,再判斷packetTemp是否完整寫出(使用position和total指針标記、判斷),決定繼續輸出packetTemp的内容,還是開始下一輪出隊。

這個過程的程式框圖如下所示:

在代碼中,SendDispatcher實際上是一個接口,我用AsyncSendDispatcher實作此接口,代碼如下:

package cn.buptleida.niohdl.impl.async;

import cn.buptleida.niohdl.core.*;

import cn.buptleida.utils.CloseUtil;

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedDeque;

import java.util.concurrent.atomic.AtomicBoolean;

public class AsyncSendDispatcher implements SendDispatcher {

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private Sender sender;
private Queue<SendPacket> queue = new ConcurrentLinkedDeque<>();
private AtomicBoolean isSending = new AtomicBoolean();
private ioArgs ioArgs = new ioArgs();
private SendPacket packetTemp;
//目前發送的packet大小以及進度
private int total;
private int position;

public AsyncSendDispatcher(Sender sender) {
    this.sender = sender;
}

/**
 * connector将資料封裝進packet後,調用這個方法
 * @param packet
 */
@Override
public void send(SendPacket packet) {
    queue.offer(packet);//将資料放進隊列中
    if (isSending.compareAndSet(false, true)) {
        sendNextPacket();
    }
}

@Override
public void cancel(SendPacket packet) {

}

/**
 * 從隊列中取資料
 * @return
 */
private SendPacket takePacket() {
    SendPacket packet = queue.poll();
    if (packet != null && packet.isCanceled()) {
        //已經取消不用發送
        return takePacket();
    }
    return packet;
}

private void sendNextPacket() {
    SendPacket temp = packetTemp;
    if (temp != null) {
        CloseUtil.close(temp);
    }
    SendPacket packet = packetTemp = takePacket();
    if (packet == null) {
        //隊列為空,取消發送狀态
        isSending.set(false);
        return;
    }

    total = packet.length();
    position = 0;

    sendCurrentPacket();
}

private void sendCurrentPacket() {
    ioArgs args = ioArgs;

    args.startWriting();//将ioArgs緩沖區中的指針設定好

    if (position >= total) {
        sendNextPacket();
        return;
    } else if (position == 0) {
        //首包,需要攜帶長度資訊
        args.writeLength(total);
    }

    byte[] bytes = packetTemp.bytes();
    //把bytes的資料寫入到IoArgs中
    int count = args.readFrom(bytes, position);
    position += count;

    //完成封裝
    args.finishWriting();//flip()操作
    //向通道注冊OP_write,将Args附加到runnable中;selector線程監聽到就緒即可觸發線程池進行消息發送
    try {
        sender.sendAsync(args, ioArgsEventListener);
    } catch (IOException e) {
        closeAndNotify();
    }
}

private void closeAndNotify() {
    CloseUtil.close(this);
}

@Override
public void close(){
    if (isClosed.compareAndSet(false, true)) {
        isSending.set(false);
        SendPacket packet = packetTemp;
        if (packet != null) {
            packetTemp = null;
            CloseUtil.close(packet);
        }
    }
}

/**
 * 接收回調,來自writeHandler輸出線程
 */
private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
    @Override
    public void onStarted(ioArgs args) {

    }

    @Override
    public void onCompleted(ioArgs args) {
        //繼續發送目前包packetTemp,因為可能一個包沒發完
        sendCurrentPacket();
    }
};
           

ReceiveDispatcher

同樣,ReceiveDispatcher也是一個接口,代碼中用AsyncReceiveDispatcher實作。在connector對象的執行個體域中會引用一個AsyncReceiveDispatcher對象。接收資料時,會通過ReceiveDispatcher中的方法對接收到的資料進行拆包處理。其大緻的關系圖如下所示:

每一個消息體的首部會有一個四位元組的int字段,代表消息的長度值,按照這個長度來進行讀取。如若一個ioArgs未滿足這個長度,就讀取下一個ioArgs,保證資料包的完整性。這個流程就不畫程式框圖了,偷個懶hhhh。其實看下面代碼注釋已經很清晰了,容易了解。

AsyncReceiveDispatcher的代碼如下所示:

import cn.buptleida.niohdl.box.StringReceivePacket;

import cn.buptleida.niohdl.core.ReceiveDispatcher;

import cn.buptleida.niohdl.core.ReceivePacket;

import cn.buptleida.niohdl.core.Receiver;

import cn.buptleida.niohdl.core.ioArgs;

public class AsyncReceiveDispatcher implements ReceiveDispatcher {

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Receiver receiver;
private final ReceivePacketCallback callback;
private ioArgs args = new ioArgs();
private ReceivePacket packetTemp;
private byte[] buffer;
private int total;
private int position;

public AsyncReceiveDispatcher(Receiver receiver, ReceivePacketCallback callback) {
    this.receiver = receiver;
    this.receiver.setReceiveListener(ioArgsEventListener);
    this.callback = callback;
}

/**
 * connector中調用該方法進行
 */
@Override
public void start() {
    registerReceive();
}

private void registerReceive() {

    try {
        receiver.receiveAsync(args);
    } catch (IOException e) {
        closeAndNotify();
    }
}

private void closeAndNotify() {
    CloseUtil.close(this);
}

@Override
public void stop() {

}

@Override
public void close() throws IOException {
    if(isClosed.compareAndSet(false,true)){
        ReceivePacket packet = packetTemp;
        if(packet!=null){
            packetTemp = null;
            CloseUtil.close(packet);
        }
    }
}

/**
 * 回調方法,從readHandler輸入線程中回調
 */
private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
    @Override
    public void onStarted(ioArgs args) {
        int receiveSize;
        if (packetTemp == null) {
            receiveSize = 4;
        } else {
            receiveSize = Math.min(total - position, args.capacity());
        }
        //設定接受資料大小
        args.setLimit(receiveSize);
    }

    @Override
    public void onCompleted(ioArgs args) {
        assemblePacket(args);
        //繼續接受下一條資料,因為可能同一個消息可能分隔在兩份IoArgs中
        registerReceive();
    }
};

/**
 * 解析資料到packet
 * @param args
 */
private void assemblePacket(ioArgs args) {
    if (packetTemp == null) {
        int length = args.readLength();
        packetTemp = new StringReceivePacket(length);
        buffer = new byte[length];
        total = length;
        position = 0;
    }
    //将args中的資料寫進外面buffer中
    int count = args.writeTo(buffer,0);
    if(count>0){
        //将資料存進StringReceivePacket的buffer當中
        packetTemp.save(buffer,count);
        position+=count;
        
        if(position == total){
            completePacket();
            packetTemp = null;
        }
    }
    
}

private void completePacket() {
    ReceivePacket packet = this.packetTemp;
    CloseUtil.close(packet);
    callback.onReceivePacketCompleted(packet);
}
           

總結

其實粘包、半包的解決方案并沒有什麼奧秘,單純地複雜而已。方法核心就是自定義一個消息體Packet,完成Packet中的byte數組與緩沖區數組之間的複制轉化即可。當然,position、limit等等指針的輔助很重要。

總結這個部落格,也是将目前為止的工作進行梳理和記錄。我将通過smyl-im這個項目來持續學習+實踐。因為之前學習過程中有很多零碎的知識點,都躺在我的有道雲筆記裡,感覺沒必要總結成部落格。本次部落格講的内容剛好是一個成體系的東西,正好可以将這個項目背景帶出來,後續的部落格就可以在這基礎上衍生拓展了。

原文位址

https://www.cnblogs.com/buptleida/p/12732288.html