
注:以下分析基于開源 v19.15.2.2-stable 版本進行
引言
ClickHouse核心分析系列文章,繼上一篇文章
MergeTree查詢鍊路之後,這次我将為大家介紹MergeTree存儲引擎的異步Merge和Mutation機制。建議讀者先補充上一篇文章的基礎知識,這樣會比較容易了解。
MergeTree Mutation功能介紹
在上一篇系列文章中,我已經介紹過ClickHouse核心中的MergeTree存儲一旦生成一個Data Part,這個Data Part就不可再更改了。是以從MergeTree存儲核心層面,ClickHouse就不擅長做資料更新删除操作。但是絕大部分使用者場景中,難免會出現需要手動訂正、修複資料的場景。是以ClickHouse為使用者設計了一套離線異步機制來支援低頻的Mutation(改、删)操作。
Mutation指令執行
ALTER TABLE [db.]table DELETE WHERE filter_expr;
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
ClickHouse的方言把Delete和Update操作也加入到了Alter Table的範疇中,它并不支援裸的Delete或者Update操作。當使用者執行一個如上的Mutation操作獲得傳回時,ClickHouse核心其實隻做了兩件事情:
- 檢查Mutation操作是否合法;
- 儲存Mutation指令到存儲檔案中,喚醒一個異步處理merge和mutation的工作線程;
兩者的主體邏輯分别在MutationsInterpreter::validate函數和StorageMergeTree::mutate函數中。
MutationsInterpreter::validate函數dry run一個異步Mutation執行的全過程,其中涉及到檢查Mutation是否合法的判斷原則是列值更新後記錄的分區鍵和排序鍵不能有變化。因為分區鍵和排序鍵一旦發生變化,就會導緻多個Data Part之間之間Merge邏輯的複雜化。剩餘的Mutation執行過程可以看做是打開一個Data Part的BlockInputStream,在這個BlockStream的基礎上封裝删除操作的FilterBlockInputStream,再加上更新操作的ExpressionBlockInputStream,最後把資料通過BlockOutputStream寫回到新的Data Part中。這裡簡單介紹一下ClickHouse的計算層實作,整體上它是一個火山模型的計算引擎,資料的各種filer、投影、join、agg都是通過BlockStrem抽象實作,在BlockStream中資料是按照Block進行傳輸處理的,而Block中的資料又是按照列模式組織,這使得ClickHouse在單列的計算上可以批量化并使用一些SIMD指令加速。BlockOutputStream承擔了MergeTree Data Part列存寫入和索引建構的全部工作,我會在後續的文章中會詳細展開介紹ClickHouse計算層中各類功能的BlockStream,以及BlockOutputStream中建構索引的實作細節。
在Mutation指令的執行過程中,我們可以看到MergeTree會把整條Alter指令儲存到存儲檔案夾下,然後建立一個MergeTreeMutationEntry對象儲存到表的待修改狀态中,最後喚醒一個異步處理merge和 mutation的工作線程。這裡有一個關鍵的問題,因為Mutation的實際操作是異步發生的,在使用者的Alter指令傳回之後仍然會有資料寫入,系統如何在異步訂正的過程中排除掉Alter指令之後寫入的資料呢?下一節中我會介紹MergeTree中Data Part的Version機制,它可以在Data Part級别解決上面的問題。但是因為ClickHouse寫傳入連結路的異步性,ClickHouse仍然無法保證Alter指令前Insert的每條紀錄都被更新,隻能確定Alter指令前已經存在的Data Part都會被訂正,推薦使用者隻用來訂正T+1場景的離線資料。
異步Merge&Mutation
Batch Insert和Mutation的資料一緻性
struct MergeTreePartInfo
{
String partition_id;
Int64 min_block = 0;
Int64 max_block = 0;
UInt32 level = 0;
Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
...
/// Get block number that can be used to determine which mutations we still need to apply to this part
/// (all mutations with version greater than this block number).
Int64 getDataVersion() const { return mutation ? mutation : min_block; }
...
bool operator<(const MergeTreePartInfo & rhs) const
{
return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
< std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
}
}
在具體展開MergeTree的異步merge和mutation機制之前,先需要詳細介紹一下MergeTree中對Data Part的管理方式。每個Data Part都有一個MergeTreePartInfo對象來儲存它的meta資訊,MergeTreePartInfo類的結構如上方代碼所示。
- partition_id:表示所屬的資料分區id。
- min_block、max_block:blockNumber是資料寫入的一個版本資訊,在上一篇系列文章中講過,使用者每次批量寫入的資料都會生成一個Data Part。同一批寫入的資料會被assign一個唯一的blockNumber,而這個blockNumber是在MergeTree表級别自增的。以及MergeTree在merge多個Data Part的時候會準守一個原則:在同一個資料分區下選擇blockNumber區間相鄰的若幹個Data Parts進行合并,不會出現在同一個資料分區下Data Parts之間的blockNumber區間出現重合。是以Data Part中的min_block和max_block可以表示目前Data Part中資料的版本範圍。
- level:表示Data Part所在的層級,新寫入的Data Part都屬于level 0。異步merge多個Data Part的過程中,系統會選擇其中最大的level + 1作為新Data Part的level。這個資訊可以一定程度反映出目前的Data Part是經曆了多少次merge,但是不能準确表示,核心原因是MergeTree允許多個Data Part跨level進行merge的,為了最終一個資料分區内的資料merge成一個Data Part。
- mutation:和批量寫入資料的版本号機制類似,MergeTree表的mutation指令也會被assign一個唯一的blockNumber作為版本号,這個版本号資訊會儲存在MergeTreeMutationEntry中,是以通過版本号資訊我們可以看出資料寫入和mutation指令之間的先後關系。Data Part中的這個mutation表示的則是目前這個Data Part已經完成的mutation操作,對每個Data Part來說它是按照mutation的blockNumber順序依次完成所有的mutation。
解釋了MergeTreePartInfo類中的資訊含義,我們就可以了解上一節中遺留的異步Mutation如何選擇哪些Data Parts需要訂正的問題。系統可以通過MergeTreePartInfo::getDataVersion() { return mutation ? mutation : min_block }函數來判斷目前Data Part是否需要進行某個mutation訂正,比較兩者version即可。
Merge&Mutation工作任務
ClickHouse核心中異步merge、mutation工作由統一的工作線程池來完成,這個線程池的大小使用者可以通過參數background_pool_size進行設定。線程池中的線程Task總體邏輯如下,可以看出這個異步Task主要做三塊工作:清理殘留檔案,merge Data Parts 和 mutate Data Part。
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
....
try
{
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
clearOldMutations();
}
///TODO: read deduplicate option from table config
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
return BackgroundProcessingPoolTaskResult::ERROR;
}
...
}
需要清理的殘留檔案分為三部分:過期的Data Part,臨時檔案夾,過期的Mutation指令檔案。如下方代碼所示,MergeTree Data Part的生命周期包含多個階段,建立一個Data Part的時候分兩階段執行Temporary->Precommitted->Commited,淘汰一個Data Part的時候也可能會先經過一個Outdated狀态,再到Deleting狀态。在Outdated狀态下的Data Part仍然是可查的。異步Task在收集Outdated Data Part的時候會根據它的shared_ptr計數來判斷目前是否有查詢Context引用它,沒有的話才進行删除。清理臨時檔案的邏輯較為簡單,在資料檔案夾中周遊搜尋"tmp_"開頭的檔案夾,并判斷建立時長是否超過temporary_directories_lifetime。臨時檔案夾主要在ClickHouse的兩階段送出過程可能造成殘留。最後是清理資料已經全部訂正完成的過期Mutation指令檔案。
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
Merge邏輯
StorageMergeTree::merge函數是MergeTree異步Merge的核心邏輯,Data Part Merge的工作除了通過背景工作線程自動完成,使用者還可以通過Optimize指令來手動觸發。自動觸發的場景中,系統會根據背景空閑線程的資料來啟發式地決定本次Merge最大可以處理的資料量大小,max_bytes_to_merge_at_min_space_in_pool和max_bytes_to_merge_at_max_space_in_pool參數分别決定當空閑線程數最大時可處理的資料量上限以及隻剩下一個空閑線程時可處理的資料量上限。當使用者的寫入量非常大的時候,應該适當調整工作線程池的大小和這兩個參數。當使用者手動觸發merge時,系統則是根據disk剩餘容量來決定可處理的最大資料量。
接下來介紹merge過程中最核心的邏輯:如何選擇Data Parts進行merge?為了友善了解,這裡先介紹一下Data Parts在MergeTree表引擎中的管理組織方式。上一節中提到的MergeTreePartInfo類中定義了比較操作符,MergeTree中的Data Parts就是按照這個比較操作符進行排序管理,排序鍵是(partition_id, min_block, max_block, level, mutation),索引管理結構如下圖所示:
自動Merge的處理邏輯,首先是通過MergeTreeDataMergerMutator::selectPartsToMerge函數篩選出本次merge要合并的Data Parts,這個篩選過程需要準守三個原則:
- 跨資料分區的Data Part之間不能合并;
- 合并的Data Parts之間必須是相鄰(在上圖的有序組織關系中相鄰),隻能在排序連結清單中按段合并,不能跳躍;
- 合并的Data Parts之間的mutation狀态必須是一緻的,如果Data Part A 後續還需要完成mutation-23而Data Part B後續不需要完成mutation-23(資料全部是在mutation指令之後寫入或者已經完成mutation-23),則A和B不能進行合并;
是以我們上面的Data Parts組織關系邏輯示意圖中,相同顔色的Data Parts是可以合并的。雖然圖中三個不同顔色的Data Parts序列都是可以合并的,但是合并工作線程每次隻會挑選其中某個序列的一小段進行合并(如前文所述,系統會限定每次合并的Data Parts的資料量)。對于如何從這些序列中挑選出最佳的一段區間,ClickHouse抽象出了IMergeSelector類來實作不同的邏輯。目前主要有兩種不同的merge政策:TTL資料淘汰政策和正常政策。
- TTL資料淘汰政策:TTL資料淘汰政策啟用的條件比較苛刻,隻有當某個Data Part中存在資料生命周期逾時需要淘汰,并且距離上次使用TTL政策達到一定時間間隔(預設1小時)。TTL政策也非常簡單,首先挑選出TTL逾時最嚴重Data Part,把這個Data Part所在的資料分區作為要進行資料合并的分區,最後會把這個TTL逾時最嚴重的Data Part前後連續的所有存在TTL過期的Data Part都納入到merge的範圍中。這個政策簡單直接,每次保證優先合并掉最老的存在過期資料的Data Part。
- 正常政策:這裡的選舉政策就比較複雜,基本邏輯是枚舉每個可能合并的Data Parts區間,通過啟發式規則判斷是否滿足合并條件,再有啟發式規則進行算分,選取分數最好的區間。啟發式判斷是否滿足合并條件的算法在SimpleMergeSelector.cpp::allow函數中,其中的主要思想分為以下幾點:系統預設對合并的區間有一個Data Parts數量的限制要求(每5個Data Parts才能合并);如果目前資料分區中的Data Parts出現了膨脹,則适量放寬合并數量限制要求(最低可以兩兩merge);如果參與合并的Data Parts中有很久之前寫入的Data Part,也适量放寬合并數量限制要求,放寬的程度還取決于要合并的資料量。第一條規則是為了提升寫入性能,避免在高速寫入時兩兩merge這種低效的合并方式。最後一條規則則是為了保證随着資料分區中的Data Part老化,老齡化的資料分區内資料全部合并到一個Data Part。中間的規則更多是一種保護手段,防止因為寫入和頻繁mutation的極端情況下,Data Parts出現膨脹。啟發式算法的政策則是優先選擇IO開銷最小的Data Parts區間完成合并,盡快合并掉小資料量的Data Parts是對線上查詢最有利的方式,資料量很大的Data Parts已經有了很較好的資料壓縮和索引效率,合并操作對查詢帶來的成本效益較低。
Mutation邏輯
StorageMergeTree::tryMutatePart函數是MergeTree異步mutation的核心邏輯,主體邏輯如下。系統每次都隻會訂正一個Data Part,但是會聚合多個mutation任務批量完成,這點實作非常的棒。因為在使用者真實業務場景中一次資料訂正邏輯中可能會包含多個Mutation指令,把這多個mutation操作聚合到一起訂正效率上就非常高。系統每次選擇一個排序鍵最小的并且需要訂正Data Part進行操作,本意上就是把資料從前往後進行依次訂正。
Mutation功能是MergeTree表引擎最新推出一大功能,從我個人的角度看在實作完備度上還有一下兩點需要去優化:
- mutation沒有實時可見能力。我這裡的實時可見并不是指在存儲上立即原地更新,而是給使用者提供一種途徑可以立即看到資料訂正後的最終視圖確定訂正無誤。類比在使用CollapsingMergeTree、SummingMergeTree等進階MergeTree引擎時,資料還沒有完全merge到一個Data Part之前,存儲層并沒有一個資料的最終視圖。但是使用者可以通過Final查詢模式,在計算引擎層實時聚合出資料的最終視圖。這個原理對mutation實時可見也同樣适用,在實時查詢中通過FilterBlockInputStream和ExpressionBlockInputStream完成使用者的mutation操作,給使用者提供一個最終視圖。
- mutation和merge互相獨立執行。看完本文前面的分析,大家應該也注意到了目前Data Part的merge和mutation是互相獨立執行的,Data Part在同一時刻隻能是在merge或者mutation操作中。對于MergeTree這種存儲徹底Immutable的設計,資料頻繁merge、mutation會引入巨大的IO負載。實時上merge和mutation操作是可以合并到一起去考慮的,這樣可以省去資料一次讀寫盤的開銷。對資料寫入壓力很大又有頻繁mutation的場景,會有很大幫助。
for (const auto & part : getDataPartsVector())
{
...
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
size_t commands_size = interpreter.evaluateCommandsSize();
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
break;
}
最後在經過背景工作線程一輪merge和mutation操作之後,上一節中展示的MergeTree表引擎中的Data Parts可能發生的變化如下圖所示,2020-05-10資料分區下的頭兩個Data Parts被merge到了一起,并且完成了Mutation 37和Mutation 39的資料訂正,新産生的Data Part如紅色所示:
Clickhouse産品連結:
https://www.aliyun.com/product/clickhouseClickHouse核心分析系列文章:
希望通過核心分析系列文章,讓大家更好地了解這款世界領先的列式存儲分析型資料庫。