bin-log-distributor項目簡介
bin-log-distributor是凱京科技開源的Mysql資料庫資料變動實時監聽分發中間件,詳情見
碼雲開源位址,
github開源位址背景
線上回報有bin-log-distributor用戶端偶爾有丢失資料的情況。
驗證方法
在裝mysql的伺服器上,使用多個線程并發循環對bin-log-distributor監控的資料表進行增、删、改。
預期
bin-log-distributor用戶端出現資料丢失。
定位
分析用戶端收到的資料,發現不僅有資料丢失,還有資料重複消費。
将用戶端日志列印更仔細,發現以下兩點問題:
- 從redis隊列peek資料後,remove抛出如下異常:
bin-log-distributor消費資料丢失問題解決記錄 - Redis隊列資料沒消費完其他線程異常進行了消費:
bin-log-distributor消費資料丢失問題解決記錄
分析如下消費redis資料方法源碼:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
try {
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
// 嘗試加鎖,最多等待20秒,上鎖以後30秒自動解鎖
boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
//拿到鎖且 隊列不為空 進入
if (lockRes && !queue.isEmpty()) {
//讓這個線程把隊列裡的全部處理完吧
rLock.lock();
//DATA_KEY_IN_PROCESS.add(dataKey);
while ((dto = queue.peek()) != null) {
//處理完畢,把資料從隊列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收處理資料失敗:" + e.toString());
} finally {
rLock.forceUnlock();
rLock.delete();
//DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
猜測是redis RLock并沒有效,于是在rLock.tryLock() 和 rLock.delete() 方法後面分别列印出 acquireLock 和 releaseLock的日志,再次執行測試,得到如下日志:
發現 pool-2-thread-1線程成功擷取鎖後,pool-2-thread-2擷取鎖失敗,并立刻将pool-2-thread-1 持有的鎖release了。結合源碼發現pool-2-thread-2擷取鎖失敗後,執行了finally代碼塊中的rLock.forceUnlock()方法,查閱資料發現forceUnlock()方法可以直接釋放相同key的其他線程持有的鎖。
至此,定位到問題為:每次收到資料起一個線程執行doRunWithLock()方法,當資料庫操作較頻繁時,一個線程擷取到的redis鎖會被另一個線程release掉,導緻redis鎖内部本來應該是單例執行的消費redis隊列資料變成了多個線程同時執行,進而出現多個線程消費隊列資料沖突,引發資料丢失和重複消費的問題。
解決
確定每個線程擷取到的Redis鎖隻能由目前線程release(),更改後的代碼如下:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
boolean lockRes = false;
try {
// 嘗試加鎖,最多等待50ms(防止過多線程等待)
// 上鎖以後6個小時自動解鎖(防止redis隊列太長,目前拿到鎖的線程處理時間過長)
lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
if (!lockRes) {
return;
}
DATA_KEY_IN_PROCESS.add(dataKey);
//拿到鎖之後再擷取隊列
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
if (!queue.isExists() || queue.isEmpty()) {
return;
}
//拿到鎖且 隊列不為空 進入
while ((dto = queue.peek()) != null) {
//處理完畢,把資料從隊列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收處理資料失敗:" + e.toString());
} finally {
//forceUnlock是可以釋放别的線程拿到的鎖的,需要判斷是否是目前線程持有的鎖
if (lockRes) {
rLock.forceUnlock();
rLock.delete();
DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
}
回歸
更改後執行繼續執行相同的測試用例,得到如下結果日志:
pool-2-thread-2在擷取到redis鎖之後一直持續消費redis隊列中的資料,其他線程嘗試擷取redis鎖失敗後,不再強制release該鎖,用戶端消費條數、類型結果正确。
完成
修改測試用例,使用更多線程、執行更多資料庫操作,分析消費結果,結果資料條數正确,未出現資料丢失、重複消費等問題。
确認結果回歸結果正确,送出代碼。