天天看點

webSocket多線程推送出錯錯誤資訊錯誤原因解決方案代碼實作總結

錯誤資訊

webSocket多線程推送出錯,報錯資訊為The remote endpoint was in state [TEXT_PARTIAL_WRITING]

錯誤原因

兩個方法有可能同時執行,當方法1或者方法2周遊到某一個session并且調用sendMessage發送消息的時候,另外一個方法也正好也在使用相同的session發送另外一個消息(同一個session消息發送沖突了,也就是說同一個時刻,多個線程向一個socket寫資料沖突了),就會報上面的異常。

解決方案

采用的是将session存入2個數組,數組A放置空閑session,數組B存放忙碌中的session。何為忙碌?目前session進行send發送資訊還沒結束即為忙碌。

在發送資訊時判斷目前session是忙碌狀态還是空閑狀态:

如為空閑,直接執行發送并且把目前session在空閑(A)中移除,并把它放入忙碌(B)的數組中,當執行完成,将空閑的session從忙碌(B)中移除,并且存入空閑(A)數組中。

如為忙碌,則跳過發送,延遲遞歸調用本身發送方法,延遲後執行再作判斷是目前session是否忙碌,遞歸直到空閑再發送這則資料

代碼實作

聲明靜态變量集合用于存放空閑以及忙碌的session

// 記錄空閑Session集合
    private static CopyOnWriteArraySet<Session> idle = new CopyOnWriteArraySet<Session>();
    // 記錄正在使用中Session集合,value為Future,表示使用情況
    private static ConcurrentHashMap<Session, Future<Void>> busy = new ConcurrentHashMap<Session, Future<Void>>();
           

發送資料

// 新增Session
    public static void open(Session session) {
        idle.add(session);
    }

    // 關閉Session
    public static void close(Session session) {
        idle.remove(session);
        busy.remove(session);
    }

    // 使用session發送消息
    public static void send(Session session, SocketMessage message, Integer timeout) throws InterruptedException {
        if (timeout < 0) { // timeout後放棄本次發送
            return;
        }
        if (idle.remove(session)) { // 判斷session是否空閑,搶占式
           busy.put(session, session.getAsyncRemote().sendText(JSON.toJSONString(message)));
        } else {
            // 若session目前不在idle集合,則去busy集合中檢視session上次是否已經發送完畢,即采用惰性判斷
            synchronized (busy) {
                if (busy.containsKey(session) && busy.get(session).isDone()) {
                    busy.remove(session);
                    idle.add(session);
                }
            }
            // 重試
            Thread.sleep(100);
            send(session, message, timeout - 100);
        }
    }
           

添加session->用戶端上線時操作

@OnOpen
    public void onOpen(Session session,@PathParam("param")String socketUserId) {
         System.out.println("有新的用戶端連接配接了: "+session.getId());
        open(session);
    }
           

移除session->用戶端掉線時操作

@OnClose
    public void onClose(Session session) {
        System.out.println("使用者" + session.getId() + "離線");
        close(session);
    }
           

使用發送消息

try {
	send(session, "大家好,我是零三,很高興認識大家,我的網址是web03.cn", 3000);
} catch (InterruptedException e) {
	e.printStackTrace();
}
           

總結

此方法可以很有效的解決低并發問題,這隻是一種解決方案,當然可能有更優的解決方法,交給多線程的同步機制去執行,因為我個人用多線程無法實作,是以使用目前下策