错误信息
webSocket多线程推送出错,报错信息为The remote endpoint was in state [TEXT_PARTIAL_WRITING]
错误原因
两个方法有可能同时执行,当方法1或者方法2遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。
解决方案
采用的是将session存入2个数组,数组A放置空闲session,数组B存放忙碌中的session。何为忙碌?当前session进行send发送信息还没结束即为忙碌。
在发送信息时判断当前session是忙碌状态还是空闲状态:
如为空闲,直接执行发送并且把当前session在空闲(A)中移除,并把它放入忙碌(B)的数组中,当执行完成,将空闲的session从忙碌(B)中移除,并且存入空闲(A)数组中。
如为忙碌,则跳过发送,延迟递归调用本身发送方法,延迟后执行再作判断是当前session是否忙碌,递归直到空闲再发送这则数据
代码实现
声明静态变量集合用于存放空闲以及忙碌的session
// 记录空闲Session集合
private static CopyOnWriteArraySet<Session> idle = new CopyOnWriteArraySet<Session>();
// 记录正在使用中Session集合,value为Future,表示使用情况
private static ConcurrentHashMap<Session, Future<Void>> busy = new ConcurrentHashMap<Session, Future<Void>>();
发送数据
// 新增Session
public static void open(Session session) {
idle.add(session);
}
// 关闭Session
public static void close(Session session) {
idle.remove(session);
busy.remove(session);
}
// 使用session发送消息
public static void send(Session session, SocketMessage message, Integer timeout) throws InterruptedException {
if (timeout < 0) { // timeout后放弃本次发送
return;
}
if (idle.remove(session)) { // 判断session是否空闲,抢占式
busy.put(session, session.getAsyncRemote().sendText(JSON.toJSONString(message)));
} else {
// 若session当前不在idle集合,则去busy集合中查看session上次是否已经发送完毕,即采用惰性判断
synchronized (busy) {
if (busy.containsKey(session) && busy.get(session).isDone()) {
busy.remove(session);
idle.add(session);
}
}
// 重试
Thread.sleep(100);
send(session, message, timeout - 100);
}
}
添加session->客户端上线时操作
@OnOpen
public void onOpen(Session session,@PathParam("param")String socketUserId) {
System.out.println("有新的客户端连接了: "+session.getId());
open(session);
}
移除session->客户端掉线时操作
@OnClose
public void onClose(Session session) {
System.out.println("用户" + session.getId() + "离线");
close(session);
}
使用发送消息
try {
send(session, "大家好,我是零三,很高兴认识大家,我的网址是web03.cn", 3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
总结
此方法可以很有效的解决低并发问题,这只是一种解决方案,当然可能有更优的解决方法,交给多线程的同步机制去执行,因为我个人用多线程无法实现,所以使用当前下策