天天看點

TeamTalk源碼分析——群聊技術方案和群未讀計數的實作

群聊分析

TeamTalk源碼分析——群聊技術方案和群未讀計數的實作
  1. app發送群消息到msg_server
  2. msg_server收到後,以本地時間戳設定消息建立時間(用戶端時間不可靠)
  3. 轉發
  4. 向資料庫查詢群ID有效性,非法則直接忽略
  5. 該成員是否在群成員内,非法則直接忽略
  6. 使用者和群的會話是否存在,不存在自動建立一條會話關系
  7. 生成群内唯一消息ID
  8. 寫入資料庫表group_msg中(按照gruopId%8分表)
  9. 傳回ack
  10. 傳回ack(如果有多端登入,廣播msg到多個端,各個端再回複ack确認)
  11. 向db_proxy_server查詢群成員清單
  12. 查資料庫
  13. 接收群成員
  14. 周遊
  15. 線上的直接send
  16. 離線的則推送,先去查詢群推送标志設定。
  17. 查資料庫
  18. 周遊所有離線成員的群推送标志清單
  19. 未設定為免打擾的,直接推送。ios:apns,android:各種推送如華為、小米或者第三方如極光、信鴿等
  20. 離線使用者上線
  21. 拉群消息(和微信不一樣,這裡的政策是:先查會話,點選群後拉最近20條消息展示,然後存儲本地sqlite資料庫。再往上滾,則繼續拉。)

TeamTalkd的這個方案,比較适合入門。采用了擴散讀的方案(群消息隻存儲一份,拉取多次),具體代碼不貼了。

msg_server主要看以下2個檔案:

  1. MsgConn.cpp:_HandleClientMsgData,處理用戶端的消息。
  2. DBServConn.cpp:_HandleMsgData,處理來自于db_proxy_server的響應。

db_proxy_server主要從消息驅動表入手,找到相關邏輯的實作:

  1. HandlerMap.cpp:消息驅動表實作,根據CmdID去調用相關的處理函數,從這裡入手看。
  2. business/GroupMessageModel.cpp:群聊消息處理,包括存儲,未讀計數更新等。

群未讀計數分析

TeamTalk的實作主要分成2塊:

  • 群的總未讀計數im_group_msg(1個群1個key)
  • 成員的未讀計數im_user_group(1個群N個Key)

key的規則如下:

  • 群的總未讀計數(其實就是群消息數):groupID + _im_group_msg ,示例:2_im_group_msg
  • 群成員未讀計數(更準确的說是成員已讀時,群消息數):userID + _ + groupID + _im_user_group,示例:7_2_im_user_group

算法原理如下(假設一個群有A和B共2個成員):

TeamTalk的實作主要分成2塊:

群的總未讀計數im_group_msg(1個群1個key)

成員的未讀計數im_user_group(1個群N個Key)

key的規則如下:

  • 群的總未讀計數:groupID+_im_group_msg ,示例:2_im_group_msg
  • 群成員未讀計數:userID+_+groupID+_im_user_group,示例:7_2_im_user_group

算法原理(假設一個群有A和B共2個成員):

TeamTalk源碼分析——群聊技術方案和群未讀計數的實作
  1. 當A發送了一條消息,記錄total=1,A的offset=1,此時total-A.offset=0,是以A的未讀消息數為0條。B此時上線,redis中沒有B的offset,則B的群消息未讀計數為total數,即1。
  2. 當B點選會話後,清除未讀計數其實就是更新B.offset的過程,把offset記為目前群消息的總數,下次再登入,total-B.offset就為0了,也就是清除了。
  3. 此時A同理,離線期間B發了4條消息,total=5了,total-A.offset=4,就算出來離線期間的群未讀消息數量了。

公式:

成員某群未讀消息總數 = 群消息總數(im_group_msg) - 群成員已讀消息總數(im_user_group)
           

那為什麼要這麼設計,使用已讀總數,而不是未讀總數呢(比如判斷了使用者不線上,未讀數量就+1)?

其實這樣能降低redis的更新頻率,否則200人的群,一個人發了一條消息,redis就要更新199次。上面的設計,隻需要更新2次。

附redis的結構的示例:

TeamTalk源碼分析——群聊技術方案和群未讀計數的實作

發消息代碼:

bool CGroupMessageModel::incMessageCount(uint64_t nUserId, uint32_t nGroupId) {
    bool bRet = false;
    CacheManager *pCacheManager = CacheManager::getInstance();
    CacheConn *pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn) {
        // 2002_im_group_msg
        // |——count: +1
        string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
        pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);

        // 2002_im_group_msg下所有的key取出來
        map<string, string> mapGroupCount;
        bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
        if (bRet) {
            // 1_2002_im_user_group
            // |——count: 1
            string strUserKey =
                    int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
            if (!strReply.empty()) {
                bRet = true;
            } else {
                ERROR("hmset %s failed !", strUserKey.c_str());
            }
        } else {
            ERROR("hgetAll %s failed!", strGroupKey.c_str());
        }
        pCacheManager->RelCacheConn(pCacheConn);
    } else {
        ERROR("no cache connection for unread");
    }
    return bRet;
}
           

清除未讀計數:

bool CGroupMessageModel::clearMessageCount(uint64_t nUserId, uint32_t nGroupId) {
    bool bRet = false;
    CacheManager *pCacheManager = CacheManager::getInstance();
    CacheConn *pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn) {
        // 用總數覆寫offset,即total-offset=0
        string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
        map<string, string> mapGroupCount;
        bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);
        pCacheManager->RelCacheConn(pCacheConn);
        if (bRet) {
            string strUserKey =
                    int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);
            if (strReply.empty()) {
                ERROR("hmset %s failed !", strUserKey.c_str());
            } else {
                bRet = true;
            }
        } else {
            ERROR("hgetAll %s failed !", strGroupKey.c_str());
        }
    } else {
        ERROR("no cache connection for unread");
    }
    return bRet;
}
           

擷取使用者的未讀消息總數:

void CGroupMessageModel::getUnReadCntAll(uint64_t nUserId, uint32_t &nTotalCnt) {
    list<uint32_t> lsGroupId;
    CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);
    uint32_t nCount = 0;

    CacheManager *pCacheManager = CacheManager::getInstance();
    CacheConn *pCacheConn = pCacheManager->GetCacheConn("unread");
    if (pCacheConn) {
        for (auto it = lsGroupId.begin(); it != lsGroupId.end(); ++it) {
            uint32_t nGroupId = *it;
            string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);
            if (strGroupCnt.empty()) {
//                log("hget %s : count failed !", strGroupKey.c_str());
                continue;
            }
            uint32_t nGroupCnt = (uint32_t) (atoi(strGroupCnt.c_str()));

            string strUserKey =
                    int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;
            string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);

            uint32_t nUserCnt = (strUserCnt.empty() ? 0 : ((uint32_t) atoi(strUserCnt.c_str())));
            // 這裡就是上面說的:total - offset = 未讀數量
            if (nGroupCnt >= nUserCnt) {
                nCount = nGroupCnt - nUserCnt;
            }
            if (nCount > 0) {
                nTotalCnt += nCount;
            }
        }
        pCacheManager->RelCacheConn(pCacheConn);
    } else {
        ERROR("no cache connection for unread");
    }
}
           

關于作者

推薦下自己的開源IM,純Golang編寫:

CoffeeChat:

https://github.com/xmcy0011/CoffeeChat

opensource im with server(go) and client(flutter+swift)

參考了TeamTalk、瓜子IM等知名項目,包含服務端(go)和用戶端(flutter+swift),單聊和機器人(小微、圖靈、思知)聊天功能已完成,目前正在研發群聊功能,歡迎對golang和跨平台開發flutter技術感興趣的小夥伴Star加關注。

————————————————

版權聲明:本文為CSDN部落客「許非」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/xmcy001122/article/details/109316394

繼續閱讀