天天看點

Netty和WebSocket實作IM,讨論Channel和使用者辨別的雙向綁定,離線消息和消息簽收問題引入"綁定"類型的消息需要攜帶token離線消息和消息簽收業務層面的消息類型和消息模型定義核心代碼

2021-5-10    更新mark

問題引入

相信很多人用過Netty寫過聊天室的簡單案例吧,可以說是模闆代碼了,沒有結合業務。如果我們要做項目中的即時通訊子產品(IM),需要将使用者A發的消息轉發給使用者B,将會不可避免的遇到一個問題:如何快速找到使用者B所建立的Channel  (使用者 -> Channel 的映射)?圍繞我們的聊天業務,離線消息又如何進行推送?一個使用者建立Channel之後,我們要想知道他有沒有未簽收的離線消息,就必定要知道使用者辨別。原則來講,我們又如何避免一個使用者重複建立Channel?

換言之,在IM業務中,我們要解決:Channel和使用者辨別綁定的問題。他們的映射關系是一對一的。

"綁定"類型的消息需要攜帶token

要想綁定使用者辨別,用戶端就必須在WebSocket建立之後(Channel建立之後),立馬發送一條綁定類型的消息給後端,該消息必須要攜帶使用者唯一辨別,後端建立并維護Channel和使用者的一對一映射關系。那麼綁定類型的消息,攜帶的使用者辨別是什麼?用戶端本地存儲的userid?其實這并不合理,應該攜帶token!(我這裡用的jwt,jwt裡面的載荷有userid)。

為什麼攜帶token更加合理?因為token可以代表使用者的一次有效的登入狀态,我們可以在後端驗證使用者登入狀态有效性(嚴格的可以做單點登入的驗證),并且可以查出使用者的身份資訊,包括userid。綁定類型的消息如果攜帶userid,之是以說不合理,是因為:假設使用者id就是自增長的unsigned int,那麼userid就很容易猜到,就是一個純數字嘛,那麼拿一個純數字,就可以随便跟我後端建立websocket連接配接,對後端來說,必然不安全。

離線消息和消息簽收

什麼是離線消息?比如說使用者A給使用者B發送一條消息,後端轉發的時候發現使用者B不線上(換言之,就是沒有建立WebSocket連接配接,沒有建立Channel)。那麼這條消息對于B來說就是離線消息。

什麼是簽收?使用者A給使用者B發送一條消息,B同時也線上,他就能收到這一條消息,那麼這條消息就是“已簽收”。假如,此時B不線上,那他肯定就沒辦法收到,就稱這條消息“未簽收”。

如果B不線上,我們顯然沒有辦法立馬将消息轉發給B,需要将消息暫存到資料庫。當使用者B上線(Channel建立之後),我們去資料庫查詢他是否有未簽收的消息,如果有,則将未簽收的消息立刻推送給B。

這裡,我們遇到2個問題。

1、我們要去資料查詢使用者未簽收的消息,就必須知道這是哪個使用者。(Channel -> 使用者)。我們前面讨論的 Channel和使用者辨別的雙向綁定 就解決了這個問題。

2、要想知道消息有沒有成功被B收到,我們就必須給消息(資料庫表)增設一個簽收狀态字段,同時使用者在成功收到消息之後,要立馬告訴後端,該消息已經簽收了。是以我們還有一種類型的消息。稱為 “簽收”類型的消息

業務層面的消息類型和消息模型定義

除了我們上面讨論到的

1、“綁定”類型的消息:攜帶token

2、“簽收”類型的消息:攜帶消息id。分為單簽和多簽,為了友善,如果是多簽,我們與前端約定,将多個消息id之間以逗号作為分隔符拼接成字元串

之外,我們的聊天業務中還涉及到其他一些類型的消息,比如說:聊天消息,好友申請消息,拉取新好友類型的消息,以及 心跳類型的消息,等等。下面我們再來分析一下聊天類型的消息。

3、“聊天”類型的消息

這個基于業務,可以分為:單聊和群聊。根據消息内容的不同,又可以分為:文字消息、圖檔消息、語音消息等。這裡簡單起見,我們以 單聊、文字消息 為例子進行讨論。這種類型的消息,需要攜帶哪些資料?顯然接收者的userid和文字消息的内容(String類型)是必須的。如果是群聊,還得攜帶上groupid。

每種類型的消息,攜帶的資料可能都不同。顯然需要定義泛型。見下面:

/**
 * @author passerbyYSQ
 * @create 2021-02-05 22:31
 */
@Data
public class MsgModel<T> implements Serializable {
    // 消息類型
    private Integer action;
    // 消息實體
    private T data;
}
           

為了規範定義消息類型,我們另外定義枚舉類:

package net.ysq.webchat.netty.entity;

/**
 *
 * @Description: 發送消息的動作 枚舉
 */
public enum MsgActionEnum {

	BIND(1, "第一次(或重連)初始化連接配接"),
	CHAT(2, "聊天消息"),
	SIGNED(3, "消息簽收"),
	KEEP_ALIVE(4, "心跳消息"),
	PULL_FRIEND(5, "拉取好友"),
	FRIEND_REQUEST(6, "請求添加為好友"),
	FORCE_OFFLINE(7, "賬号在異地登入,您已被擠下線");

	public final Integer type;
	public final String content;

	MsgActionEnum(Integer type, String content){
		this.type = type;
		this.content = content;
	}

	public Integer getType() {
		return type;
	}
}
           

核心代碼

維護使用者辨別和Channel映射關系的UserChannelRepository

package net.ysq.webchat.netty;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import net.ysq.webchat.netty.entity.MsgActionEnum;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.utils.SpringUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 使用者id和channel關聯的倉庫
 *
 * @author passerbyYSQ
 * @create 2021-02-05 23:20
 */
@Slf4j
public class UserChannelRepository {

    //private final static Logger logger = LoggerFactory.getLogger(UserChannelRepository.class);

    private static ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static Map<String, Channel> USER_CHANNEL = new ConcurrentHashMap<>();
    private static final Object bindLocker = new Object();
    private static final Object removeLocker = new Object();

    public static void bind(String userId, Channel channel) {
        synchronized (bindLocker) {
            // 此時channel一定已經在ChannelGroup中了

            // 之前已經綁定過了,移除并釋放掉之前綁定的channel
            if (USER_CHANNEL.containsKey(userId)) { // map  userId --> channel
                Channel oldChannel = USER_CHANNEL.get(userId);
                CHANNEL_GROUP.remove(oldChannel);
                oldChannel.close();
            }

            // 雙向綁定
            // channel -> userId
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            channel.attr(key).set(userId);

            // userId -> channel
            USER_CHANNEL.put(userId, channel);
        }
    }

    /**
     * 從通道中擷取userId。隻要userId和channel綁定周,這個方法就一定能擷取的到
     * @param channel
     * @return
     */
    public static String getUserId(Channel channel) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        return channel.attr(key).get();
    }

    public static void add(Channel channel) {
        CHANNEL_GROUP.add(channel);
    }

    public static void remove(Channel channel) {
        synchronized(removeLocker) { // 確定原子性

            String userId = getUserId(channel);

            // userId有可能為空。可能chanelActive之後,由于前端原因(或者網絡原因)沒有及時綁定userId。
            // 此時netty認為channelInactive了,就移除通道,這時userId就是null
            if (!StringUtils.isEmpty(userId)) {
                USER_CHANNEL.remove(userId); // map
            }

            CHANNEL_GROUP.remove(channel);

            // 關閉channel
            channel.close();
        }
    }

    public static void remove(String userId) {
        synchronized(removeLocker) { // 確定原子性

            Channel channel = USER_CHANNEL.get(userId);
            USER_CHANNEL.remove(userId); // map
            CHANNEL_GROUP.remove(channel);

            // 關閉channel
            if (!ObjectUtils.isEmpty(channel)) {
                channel.close();
            }
        }
    }

    /**
     * 判斷使用者是否線上
     * map和channelGroup中均能找得到對應的channel說明使用者線上
     * @return      線上就傳回對應的channel,不線上傳回null
     */
    public static Channel isBind(String userId) {
        Channel channel = USER_CHANNEL.get(userId); // map
        if (ObjectUtils.isEmpty(channel)) {
            return null;
        }
        return CHANNEL_GROUP.find(channel.id());
    }

    public static boolean isBind(Channel channel) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = channel.attr(key).get();
        return !ObjectUtils.isEmpty(userId) &&
                !ObjectUtils.isEmpty(USER_CHANNEL.get(userId));
    }

    public static void forceOffLine(String userId) {
        Channel channel = isBind(userId);
        if (!ObjectUtils.isEmpty(channel)) {
            // 推送下線通知
            MsgModel<Object> msgModel = new MsgModel<>();
            msgModel.setAction(MsgActionEnum.FORCE_OFFLINE.type);
            msgModel.setData(MsgActionEnum.FORCE_OFFLINE.content);
            pushMsg(userId, msgModel);

            // 移除通道。服務端單方面關閉連接配接。前端心跳會發送失敗
            remove(userId);
        }
    }

    /**
     * 消息推送
     * @param receiverId
     * @param msgModel
     */
    public static void pushMsg(String receiverId, MsgModel msgModel) {
        Channel receiverChannel = isBind(receiverId);
        if (!ObjectUtils.isEmpty(receiverChannel)) {
            TextWebSocketFrame frame = new TextWebSocketFrame(toJson(msgModel));
            receiverChannel.writeAndFlush(frame);
        } else {
            // 離線狀态
            log.info("{} 使用者離線", receiverId);
        }
    }

    private static String toJson(MsgModel msgModel) {
        // 線上,就推送;離線,不做處理
        ObjectMapper mapper = SpringUtils.getBean(ObjectMapper.class);
        try {
            return mapper.writeValueAsString(msgModel);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }

    public synchronized static void print() {
        log.info("所有通道的長id:");
        for (Channel channel : CHANNEL_GROUP) {
            log.info(channel.id().asLongText());
        }
        log.info("userId -> channel 的映射:");
        for (Map.Entry<String, Channel> entry : USER_CHANNEL.entrySet()) {
            log.info("userId: {} ---> channelId: {}", entry.getKey(), entry.getValue().id().asLongText());
        }
    }

}
           

業務Handler:TextMsgHandler

package net.ysq.webchat.netty;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import net.ysq.webchat.netty.entity.MsgActionEnum;
import net.ysq.webchat.netty.entity.MsgModel;
import net.ysq.webchat.netty.entity.SingleChatMsgRequest;
import net.ysq.webchat.netty.entity.SingleChatMsgResponse;
import net.ysq.webchat.po.ChatMsg;
import net.ysq.webchat.service.ChatMsgService;
import net.ysq.webchat.service.FriendService;
import net.ysq.webchat.utils.JwtUtils;
import net.ysq.webchat.utils.RedisUtils;
import net.ysq.webchat.utils.SpringUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.util.HtmlUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * 用于處理文本消息的handler
 *
 * @author passerbyYSQ
 * @create 2021-02-05 21:23
 */
@Slf4j
public class TextMsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // json串
        log.info("接收到的文本消息:{}", msg.text());

        ChatMsgService chatMsgService = (ChatMsgService) SpringUtils.getBean("chatMsgServiceImpl");
        FriendService friendService = (FriendService) SpringUtils.getBean("friendServiceImpl");
        RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
        ObjectMapper objectMapper = SpringUtils.getBean(ObjectMapper.class);

        // 消息類型
        JsonNode rootNode = objectMapper.readTree(msg.text());
        Integer action = rootNode.get("action").asInt();
        // 取出資料部分,不同的消息類型,資料部分對應的泛型不一樣
        JsonNode dataNode = rootNode.get("data");
        Channel channel = ctx.channel();

        // 判斷消息類型
        // 根據不同的消息類型,處理不同的業務
        if (action.equals(MsgActionEnum.BIND.type)) {
            // 1、當websocket第一次open的時候,初始化channel,把channel和userId關聯起來
            // 如果是CONNECT類型,與前端約定,data部分是token
            String token = objectMapper.treeToValue(dataNode, String.class);

            // 先驗證是否過期。如果過期會抛出異常,全局捕獲。之後的代碼不會執行
            JwtUtils.verifyJwt(token, JwtUtils.DEFAULT_SECRET);
            // 如果沒有抛出異常,表示token有效。則在Redis中尋找對應的登入資訊
            String userId = JwtUtils.getClaimByKey(token, "userId");
            String redisToken = (String) redisUtils.get("user:" + userId);

            if (!StringUtils.isEmpty(redisToken) && token.equals(redisToken)) {
                UserChannelRepository.bind(userId, channel);
                // 查詢是否有未簽收的消息,如果有,就一次性全部推送(并不是逐條推送)
                List<SingleChatMsgResponse> unsignedMsgList = chatMsgService.getUnsignedMsg(userId);
                if (unsignedMsgList.size() > 0) { // 不為空才推送
                    MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
                    model.setAction(MsgActionEnum.CHAT.type);
                    model.setData(unsignedMsgList);
                    UserChannelRepository.pushMsg(userId, model);
                }
            }

        } else if (action.equals(MsgActionEnum.KEEP_ALIVE.type)) {
            // 4、心跳類型的消息
            // 假如用戶端程序被正常退出,websocket主動斷開連接配接,那麼服務端對應的channel是會釋放的
            // 但是如果用戶端關閉網絡後,重新開機網絡,會導緻服務端會再建立一個channel
            // 而舊的channel已經沒用了,但是并沒有被移除
            log.info("收到來自于channel {} 的心跳包", channel.id().asLongText());

        } else if (UserChannelRepository.isBind(channel)) {
            // 其他類型的消息需要綁定後才會處理

            if (action.equals(MsgActionEnum.CHAT.type)) {
                // 2、聊天類型的消息,把消息儲存到資料庫,同時标記消息狀态為[未簽收]
                SingleChatMsgRequest data = objectMapper.treeToValue(dataNode, SingleChatMsgRequest.class);
                // 由于是通過websocket,而并非http協定,是以并沒有經過SpringMVC的參數綁定流程。此處需要我們自己轉義
                data.setContent(HtmlUtils.htmlEscape(data.getContent(), "UTF-8"));

                // 對于聊天消息,channel所綁定的user是發送者
                String senderId = UserChannelRepository.getUserId(channel);
                // 如果是空的,說明綁定失敗了(可能是token過期了)。不做處理
                if (!StringUtils.isEmpty(senderId) &&
                        // 且接收者是我的好友
                        !ObjectUtils.isEmpty(friendService.getMyOneFriend(senderId, data.getReceiverId()))) {

                    // 往消息表插入資料
                    ChatMsg chatMsg = chatMsgService.saveMsg(senderId, data);

                    // 建構消息實體
                    MsgModel<List<SingleChatMsgResponse>> model = new MsgModel<>();
                    model.setAction(MsgActionEnum.CHAT.type);
                    List<SingleChatMsgResponse> unsignedMsgList = new ArrayList<>();
                    unsignedMsgList.add(new SingleChatMsgResponse(chatMsg));
                    model.setData(unsignedMsgList);

                    // 推送消息
                    UserChannelRepository.pushMsg(data.getReceiverId(), model);
                }

            } else if (action.equals(MsgActionEnum.SIGNED.type)) {
                // 3、簽收消息的類型。針對具體的消息進行簽收,修改資料庫對應的消息狀态為[已簽收]
                // 簽收狀态并非是指使用者有沒有讀了消息。而是消息是否已經被推送到達使用者的手機裝置
                // 在簽收類型的消息中,代表需要簽收的消息的id。多個id之間用,分隔
                String msgIdsStr = objectMapper.treeToValue(dataNode, String.class);

                // 對于要簽收類型消息,隻有是我收到的消息,我才能簽收。是以我是接收者
                String receiverId = UserChannelRepository.getUserId(channel);
                if (!StringUtils.isEmpty(msgIdsStr)) {
                    String[] msgIds = msgIdsStr.split(",");
                    if (!ObjectUtils.isEmpty(msgIds)) {
                        chatMsgService.signMsg(receiverId, msgIds);
                    }
                }
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UserChannelRepository.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        UserChannelRepository.remove(ctx.channel());
//        logger.info("剩餘通道個數:{}", UserChannelRepository.CHANNEL_GROUP.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        UserChannelRepository.remove(ctx.channel());
    }
}
           

繼續閱讀