天天看點

bin-log-distributor消費資料丢失問題解決記錄

bin-log-distributor項目簡介

bin-log-distributor是凱京科技開源的Mysql資料庫資料變動實時監聽分發中間件,詳情見

碼雲開源位址

,

github開源位址

背景

線上回報有bin-log-distributor用戶端偶爾有丢失資料的情況。

驗證方法

在裝mysql的伺服器上,使用多個線程并發循環對bin-log-distributor監控的資料表進行增、删、改。

預期

bin-log-distributor用戶端出現資料丢失。

定位

分析用戶端收到的資料,發現不僅有資料丢失,還有資料重複消費。

将用戶端日志列印更仔細,發現以下兩點問題:

  1. 從redis隊列peek資料後,remove抛出如下異常:
    bin-log-distributor消費資料丢失問題解決記錄
  2. Redis隊列資料沒消費完其他線程異常進行了消費:
    bin-log-distributor消費資料丢失問題解決記錄

分析如下消費redis資料方法源碼:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    try {
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        // 嘗試加鎖,最多等待20秒,上鎖以後30秒自動解鎖
        boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
        //拿到鎖且 隊列不為空 進入
        if (lockRes && !queue.isEmpty()) {
            //讓這個線程把隊列裡的全部處理完吧
            rLock.lock();
            //DATA_KEY_IN_PROCESS.add(dataKey);
            while ((dto = queue.peek()) != null) {
                //處理完畢,把資料從隊列摘除
                boolean handleRes = doHandleWithLock(dto, 0);
                if (handleRes) {
                    queue.remove();
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收處理資料失敗:" + e.toString());
    } finally {
        rLock.forceUnlock();
        rLock.delete();
        //DATA_KEY_IN_PROCESS.remove(dataKey);
    }
}           

猜測是redis RLock并沒有效,于是在rLock.tryLock() 和 rLock.delete() 方法後面分别列印出 acquireLock 和 releaseLock的日志,再次執行測試,得到如下日志:

bin-log-distributor消費資料丢失問題解決記錄

發現 pool-2-thread-1線程成功擷取鎖後,pool-2-thread-2擷取鎖失敗,并立刻将pool-2-thread-1 持有的鎖release了。結合源碼發現pool-2-thread-2擷取鎖失敗後,執行了finally代碼塊中的rLock.forceUnlock()方法,查閱資料發現forceUnlock()方法可以直接釋放相同key的其他線程持有的鎖。

至此,定位到問題為:每次收到資料起一個線程執行doRunWithLock()方法,當資料庫操作較頻繁時,一個線程擷取到的redis鎖會被另一個線程release掉,導緻redis鎖内部本來應該是單例執行的消費redis隊列資料變成了多個線程同時執行,進而出現多個線程消費隊列資料沖突,引發資料丢失和重複消費的問題。

解決

確定每個線程擷取到的Redis鎖隻能由目前線程release(),更改後的代碼如下:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    boolean lockRes = false;
    try {
        // 嘗試加鎖,最多等待50ms(防止過多線程等待)
        // 上鎖以後6個小時自動解鎖(防止redis隊列太長,目前拿到鎖的線程處理時間過長)
        lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
        if (!lockRes) {
            return;
        }
        DATA_KEY_IN_PROCESS.add(dataKey);
        //拿到鎖之後再擷取隊列
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        if (!queue.isExists() || queue.isEmpty()) {
            return;
        }
        //拿到鎖且 隊列不為空 進入
        while ((dto = queue.peek()) != null) {
            //處理完畢,把資料從隊列摘除
            boolean handleRes = doHandleWithLock(dto, 0);
            if (handleRes) {
                queue.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收處理資料失敗:" + e.toString());
 
    } finally {
        //forceUnlock是可以釋放别的線程拿到的鎖的,需要判斷是否是目前線程持有的鎖
        if (lockRes) {
            rLock.forceUnlock();
            rLock.delete();
            DATA_KEY_IN_PROCESS.remove(dataKey);
        }
    }
}           

回歸

更改後執行繼續執行相同的測試用例,得到如下結果日志:

bin-log-distributor消費資料丢失問題解決記錄

pool-2-thread-2在擷取到redis鎖之後一直持續消費redis隊列中的資料,其他線程嘗試擷取redis鎖失敗後,不再強制release該鎖,用戶端消費條數、類型結果正确。

完成

修改測試用例,使用更多線程、執行更多資料庫操作,分析消費結果,結果資料條數正确,未出現資料丢失、重複消費等問題。

确認結果回歸結果正确,送出代碼。