文法樹生成及執行涉及的UML圖

HTTP Handler 接收Query請求
調用
Interpreters::executeQuery.h::executeQuery(…)
調用
Interpreters::executeQuery.h::executeQueryImpl(…)
生成
ASTPtr ast = Parsers::ParserQuery::parseImpl(…)
using ASTPtr = std::shared_ptr<IAST>
IAST是所有SQL語句解析後,生成的抽象文法樹的公共接口,例如可以生成一個ASTSelectWithUnionQuery類型的文法樹。
生成
auto interpreter = InterpreterFactory::get(ast, context, stage);
封裝ASTPtr為IInterpreter,例如Query語句會封裝成一個InterpreterSelectQuery類的對象,在生成此對象的執行個體時,也會對這個AST做一些簡單的分析和優化,例如PREWHERE和WHERE的調整、輸入資料的類型推斷等。
BlockIO res = interpreter->execute();
BlockIO封裝了目前QueryPipeline執行個體,以及相應的輸入、輸出流的指針,友善根據Pipeline中的每一個IProcessor執行個體的狀态,觸發流的讀寫以及注冊流上的事件,比如Query工作流完成事件。
BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
QueryPlan query_plan;
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline());
return res;
}
聚合函數讀取資料及執行過程
以uniq(…)方法為例,簡單講述聚合函數執行add(…)方法的過程:
Aggregator::executeOnBlock(…)
調用
Aggregator::executeWithoutKeyImpl(…)
調用
AggregateFunctionInstruction::IAggregateFunction::addBatchSinglePlace(…)
調用
IAggregateFunction::addBatchSinglePlace(…)
調用具體子類的
AggregateFunctionUniq::add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
這個方法可以用來聚合所有的資料,
對于聚合操作,在解析計劃樹時,會向Pipeline中添加一個MergingAggregatedStep,用于将所有處理後的資料聚合成一個Block。
如果所有的Blocks都在自己的資料集上聚合完成,MergingAggregatedStep中選擇使用不同的Transformer來完成所有已經聚合了的Block間的聚合操作,例如如果在記憶體充足的情況下,可以基于記憶體作排序,最終傳回排序且聚合的一個Block;如果不足則使用Merge-sort過程,通過溢出檔案的方式完成聚合。
Select Query語句的Parse過程
從SELECT語句到AST Node的過程,使用Parser*字首的類完成。
假設有一個如下的QUERY語句:
TCPHandler類
以TCP傳輸模式接收用戶端的消息,這裡指處理SQL語句的Server端類,它的核心方法如下:
void TCPHandler::runImpl() {
// 配置Socket
// 檢查消息體
// Authenticaiton查檢及配置
Settings connection_settings = connection_context.getSettings();
sendHello();
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
while (true)
{
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
{
Stopwatch idle_time;
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
{
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
{
LOG_TRACE(log, "Closing idle connection");
return;
}
}
}
/// If we need to shut down, or client disconnects.
if (server.isCancelled() || in->eof())
break;
/// Set context of request.
query_context = connection_context;
Stopwatch watch;
state.reset();
/// Initialized later.
std::optional<CurrentThread::QueryScope> query_scope;
/** An exception during the execution of request (it must be sent over the network to the client).
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::optional<DB::Exception> exception;
bool network_error = false;
bool send_exception_with_stack_trace = true;
try
{
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
*/
if (!receivePacket())
continue;
query_scope.emplace(*query_context);
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
/// 配置query執行時的上下文環境,例如setExternalTablesInitializer,setInputInitializer等
...
customizeContext(*query_context);
bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
/// 開始執行Query語句,從解析到後生成實體計劃樹
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
// 開始監聽過程
after_check_cancelled.restart();
after_send_progress.restart();
if (state.io.out)
{
state.need_receive_data_for_insert = true;
processInsertQuery(connection_settings);
}
else if (state.need_receive_data_for_input) // It implies pipeline execution
{
// 在這裡調用executeQuery傳回的結果,觸發Pipeline的執行
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors();
else if (state.io.in)
processOrdinaryQuery();
// 執行完成後的收尾工作
state.io.onFinish();
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();
if (state.is_connection_closed)
break;
sendLogs();
sendEndOfStream();
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
}
catch (const Exception & e)
{ ... /// 處理各種異常 }
catch (...)
{
state.io.onException();
exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
/// 處理QUERY執行後的收尾工作,例如發送日志和清理各種執行時的環境資訊
...
watch.stop();
LOG_DEBUG(log, "Processed in {} sec.", watch.elapsedSeconds());
/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
query_context.reset();
if (network_error)
break;
}
}
上面的方法中
executeQuery(...)
是query語句的入口,它的實作類在
executeQuery.cpp
檔案中。
QueryProcessingStage
Query語句執行的階段,它定義在
QueryProcessingStage.h
檔案中,如下:
/// Up to what stage the SELECT query is executed or needs to be executed.
namespace QueryProcessingStage
{
/// Numbers matter - the later stage has a larger number.
///
/// It is part of Protocol ABI, add values only to the end.
/// Also keep in mind that the code may depends on the order of fields, so be double aware when you will add new values.
enum Enum
{
/// Only read/have been read the columns specified in the query.
FetchColumns = 0,
/// Until the stage where the results of processing on different servers can be combined.
WithMergeableState = 1,
/// Completely.
Complete = 2,
/// Until the stage where the aggregate functions were calculated and finalized.
///
/// It is used for auto distributed_group_by_no_merge optimization for distributed engine.
/// (See comments in StorageDistributed).
WithMergeableStateAfterAggregation = 3,
MAX = 4,
};
}
executeQuery.cpp
下面的方法是在TCPHandler.cpp檔案中的runImp()方法中調用的,用來執行一條query語句。
BlockIO executeQuery(
// query語句字元串,例如select語句、insert語句、create語句等
const String & query,
// 上下文管理器,是從TCPHandler方法中connection_context中得到的,裡面存放了各種在Query運作時可配置的參數資訊
Context & context,
bool internal,
// query語句的執行狀态
QueryProcessingStage::Enum stage,
// 如果是Insert語句的話,待插入的資料是跟在Insert語句之後的,如果沒有話,就不是一條insert語句
bool may_have_embedded_data)
{
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr);
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
String format_name = ast_query_with_output->format
? getIdentifierName(ast_query_with_output->format)
: context.getDefaultFormat();
if (format_name == "Null")
streams.null_format = true;
}
return streams;
}
executeQuery(…)方法又是調用如下方法完成工作的:
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr)
{
const auto current_time = std::chrono::system_clock::now();
/// If we already executing query and it requires to execute internal query, than
/// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
if (!internal)
{
context.makeQueryContext();
CurrentThread::attachQueryContext(context);
}
const Settings & settings = context.getSettingsRef();
/// 執行個體化一個ParserQuery對象,用來解析SQL語句,并生成一棵AST樹
ParserQuery parser(end);
ASTPtr ast;
const char * query_end;
/// Don't limit the size of internal queries.
size_t max_query_size = 0;
if (!internal)
max_query_size = settings.max_query_size;
try
{
/// TODO Parser should fail early when max_query_size limit is reached.
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
/// to allow settings to take effect.
if (const auto * select_query = ast->as<ASTSelectQuery>())
{
if (auto new_settings = select_query->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
{
if (!select_with_union_query->list_of_selects->children.empty())
{
if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
}
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
if (query_with_output->settings_ast)
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
}
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
if (insert_query && insert_query->data)
{
query_end = insert_query->data;
insert_query->has_tail = has_query_tail;
}
else
{
query_end = end;
}
}
catch (...)
{
/// Anyway log the query.
String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));
auto query_for_logging = prepareQueryForLogging(query, context);
logQuery(query_for_logging, context, internal);
if (!internal)
{
onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
}
throw;
}
/// 下面的代碼是解析AST樹,生成一棵實體計劃樹,實際上是QueryPipeline的執行個體對象,它是可執行的流水線,是一種嵌套的結構。
/// QueryPipeline可以認為是對Pipe執行個體的封裝,而Pipe是對資料集上的一組transform操作,這些Pipe單元都必然有相同的資料header。
...
parseQuery.cpp
此檔案中定義了一系的全局的公共方法,上面講到的
executeQuery.cpp
檔案中調用的
parseQuery(...)
方法,就是在這個檔案中定義,而
parseQuery(...)
方法實際上又是調用一開始建立的
ParserQuery
執行個體的方法完成工作的。
ParserQuery.cpp
繼承關系:class ParserQuery : public IParserBase : public IParser
從前面展示的代碼可以看到,執行Query語句的第一步是對SQL字元串的解析,生成AST樹,而parse過程的入口就是此檔案中定義的
parseImpl(...)
方法,代碼如下:
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p;
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserSystemQuery system_p;
ParserCreateUserQuery create_user_p;
ParserCreateRoleQuery create_role_p;
ParserCreateQuotaQuery create_quota_p;
ParserCreateRowPolicyQuery create_row_policy_p;
ParserCreateSettingsProfileQuery create_settings_profile_p;
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
ParserExternalDDLQuery external_ddl_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_role_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected)
|| create_user_p.parse(pos, node, expected)
|| create_role_p.parse(pos, node, expected)
|| create_quota_p.parse(pos, node, expected)
|| create_row_policy_p.parse(pos, node, expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected);
return res;
}
從上面的代碼可以看到,ClickHouse目前支援所有的Query句型,包括DML、DDL、DQL,而ParserQuery對象,負責逐個嘗試解析,一旦遇到能夠解析完成的句型,就傳回成功,這種實作方式自然會有一定的性能損耗。
這裡我們是以SELECT句型分析的,是以這裡會在執行完
ParserQueryWithOutput
的解析後,傳回成功。
ParserQueryWithOutput.cpp
ParserInsertQuery類是繼承自IParserBase類的,是以調用
ParserQueryWithOutput::parse(...)
方法,最終會調用
parseImpl(...)
方法完成工作代碼如下:
bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserShowTablesQuery show_tables_p;
ParserSelectWithUnionQuery select_p;
ParserTablePropertiesQuery table_p;
ParserDescribeTableQuery describe_table_p;
ParserShowProcesslistQuery show_processlist_p;
ParserCreateQuery create_p;
ParserAlterQuery alter_p;
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserCheckQuery check_p;
ParserOptimizeQuery optimize_p;
ParserKillQueryQuery kill_query_p;
ParserWatchQuery watch_p;
ParserShowAccessQuery show_access_p;
ParserShowAccessEntitiesQuery show_access_entities_p;
ParserShowCreateAccessEntityQuery show_create_access_entity_p;
ParserShowGrantsQuery show_grants_p;
ParserShowPrivilegesQuery show_privileges_p;
ParserExplainQuery explain_p;
ASTPtr query;
bool parsed =
explain_p.parse(pos, query, expected)
|| select_p.parse(pos, query, expected)
|| show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`
|| show_tables_p.parse(pos, query, expected)
|| table_p.parse(pos, query, expected)
|| describe_table_p.parse(pos, query, expected)
|| show_processlist_p.parse(pos, query, expected)
|| create_p.parse(pos, query, expected)
|| alter_p.parse(pos, query, expected)
|| rename_p.parse(pos, query, expected)
|| drop_p.parse(pos, query, expected)
|| check_p.parse(pos, query, expected)
|| kill_query_p.parse(pos, query, expected)
|| optimize_p.parse(pos, query, expected)
|| watch_p.parse(pos, query, expected)
|| show_access_p.parse(pos, query, expected)
|| show_access_entities_p.parse(pos, query, expected)
|| show_grants_p.parse(pos, query, expected)
|| show_privileges_p.parse(pos, query, expected);
if (!parsed)
return false;
...
/// 其它的收尾工作
}
從上面的代碼可以看到,一條Query句型,又被細分為不同的語句,例如show、explain、select等,是以這裡繼續跟蹤
ParserSelectWithUnionQuery
類分析。
ParserSelectWithUnionQuery.cpp
一條SELECT語句,又有兩中不同的表示形式,即單句型和多句型,即存在
UNION ALL
關鍵字時,SELECT語句是多句型的,否則是單句型的。
UNION ALL
的存在,表示這條件SQL語句可以解析為多條可以
單獨執行
的QUERY語句,是以這裡以
ParserList
對象儲存可能存在的、
并行語句
,代碼如下:
bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr list_node;
ParserList parser(std::make_unique<ParserUnionQueryElement>(), std::make_unique<ParserKeyword>("UNION ALL"), false);
if (!parser.parse(pos, list_node, expected))
return false;
auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
node = select_with_union_query;
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
select_with_union_query->children.push_back(select_with_union_query->list_of_selects);
// flatten inner union query
for (auto & child : list_node->children)
getSelectsFromUnionListNode(child, select_with_union_query->list_of_selects->children);
return true;
}
從上面可以看到,不論有沒有
UNION ALL
關鍵字存在,一條SELECT語句的完整解析過程,又是交由
ParserUnionQueryElement
對象處理的。
ParserUnionQueryElement.cpp
這個類的解析方法,僅僅是建立一個新的
ParserSelectQuery
對象,然後傳回它parse後的結果。
bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
/// ParserSubquery 嘗試解析子句型,即由(和)包含的子句
/// ParserSelectQuery 嘗試解析SELECT語句
if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))
return false;
if (const auto * ast_subquery = node->as<ASTSubquery>())
node = ast_subquery->children.at(0);
return true;
}
ParserSelectQuery.cpp
終于到了一個具體的SELECT語句的解析過程,它的方法定義如下:
bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;
ParserKeyword s_select("SELECT");
ParserKeyword s_distinct("DISTINCT");
ParserKeyword s_from("FROM");
ParserKeyword s_prewhere("PREWHERE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_with("WITH");
ParserKeyword s_totals("TOTALS");
ParserKeyword s_having("HAVING");
ParserKeyword s_order_by("ORDER BY");
ParserKeyword s_limit("LIMIT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_by("BY");
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_with_ties("WITH TIES");
ParserKeyword s_offset("OFFSET");
ParserKeyword s_fetch("FETCH");
ParserKeyword s_only("ONLY");
ParserKeyword s_row("ROW");
ParserKeyword s_rows("ROWS");
ParserKeyword s_first("FIRST");
ParserKeyword s_next("NEXT");
ParserNotEmptyExpressionList exp_list(false);
ParserNotEmptyExpressionList exp_list_for_with_clause(false);
ParserNotEmptyExpressionList exp_list_for_select_clause(true); /// Allows aliases without AS keyword.
ParserExpressionWithOptionalAlias exp_elem(false);
ParserOrderByExpressionList order_list;
ParserToken open_bracket(TokenType::OpeningRoundBracket);
ParserToken close_bracket(TokenType::ClosingRoundBracket);
ASTPtr with_expression_list;
ASTPtr select_expression_list;
ASTPtr tables;
ASTPtr prewhere_expression;
ASTPtr where_expression;
ASTPtr group_expression_list;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_length;
ASTPtr limit_by_offset;
ASTPtr limit_by_expression_list;
ASTPtr limit_offset;
ASTPtr limit_length;
ASTPtr top_length;
ASTPtr settings;
/// WITH expr list
{
if (s_with.ignore(pos, expected))
{
if (!ParserList(std::make_unique<ParserWithElement>(), std::make_unique<ParserToken>(TokenType::Comma))
.parse(pos, with_expression_list, expected))
return false;
if (with_expression_list->children.empty())
return false;
}
}
...
從上面的代碼可以看到,ClickHouse基本上支援了常見的SELECT文法,并且也實作了一些自定義的關鍵字。
Select Query語句的Interpret過程
executeQuery.cpp
前面我們知道這個檔案中定義的
executeQueryImpl(...)
方法,會産生将一條SQL字元串,解析為一棵AST樹,那接下來就是生成計劃樹的過程,即interpret的過程,代碼如下:
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr)
{
/// 解析SQL字元串,生成AST
...
try
{
auto interpreter = InterpreterFactory::get(ast, context, stage);
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
{
quota = context.getQuota();
if (quota)
{
quota->used(Quota::QUERIES, 1);
quota->checkExceeded(Quota::ERRORS);
}
}
StreamLocalLimits limits;
if (!interpreter->ignoreLimits())
{
limits.mode = LimitsMode::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
// 調用解釋器的執行方法,生成可以執行的Pipeline執行個體并執行。
// res是BlockIO類型的變量
res = interpreter->execute();
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context.setInsertionTable(std::move(table_id));
}
...
}
...
}
從上面的代碼可以看到,通過
InterpreterFactory
的工廠方法,根據解析生成的AST樹的類型,建立對應的解釋器,這裡會建立一個
InterpreterSelectQuery
的對象,然後調用
InterpreterSelectQuery::execute(..)
方法。
InterpreterSelectQuery.cpp
InterpreterFactory::get(…)方法會傳回一個InterpreterSelectQuery類的執行個體,同時這個類在建立時,就會對整個AST樹進行周遊,完成樹的檢查、及基本的優化調整工作,例如文法異常、表達式錯誤、PREWHERE語句優化、JOIN優化等,更為具體的過程見其cpp檔案中的構造方法。
完成InterpreterSelectQuery對象的建立後,這裡就顯示調用
execute()
方法,開始執行這棵樹。實際這棵樹是不能直接執行的,還需要兩個過程,如下面代碼:
BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
QueryPlan query_plan;
// 産生生成一棵邏輯計劃樹
buildQueryPlan(query_plan);
// 生成一棵實體計劃樹,即流水線
res.pipeline = std::move(*query_plan.buildQueryPipeline());
return res;
}
buildQueryPlan(query_plan)
方法,會根據解析和調整後的AST樹,生成一棵邏輯樹,即
QueryPlan
,實際上它是一棵
QueryPlanStep
樹,它們的介紹見最後的相關類小節。
buildQueryPlan(...)
方法,内部通過
executeImpl(...)
方法,将AST樹中的結點,根據不同的操作類型,自頂向下搜集結點上的資訊,并建立一個個的QueryPlanStep對象,添加進QueryPlan中,關鍵代碼如下:
void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
* if there is an ORDER BY, then glue the streams using ResizeProcessor, and then MergeSorting transforms,
* if not, then glue it using ResizeProcessor,
* then apply LIMIT.
* If there is GROUP BY, then we will perform all operations up to GROUP BY, inclusive, in parallel;
* a parallel GROUP BY will glue streams into one,
* then perform the remaining operations with one resulting stream.
*/
/// Now we will compose block streams that perform the necessary actions.
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
auto & expressions = analysis_result;
auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
bool intermediate_stage = false;
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
if (options.only_analyze)
{
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
query_plan.addStep(std::move(read_nothing));
if (expressions.prewhere_info)
{
auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
// To remove additional columns in dry run
// For example, sample column which can be removed in this stage
if (expressions.prewhere_info->remove_columns_actions)
{
auto remove_columns = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->remove_columns_actions);
remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
query_plan.addStep(std::move(remove_columns));
}
}
}
/// 其它Step的建構過程
...
}
當完成整棵QueryPlan樹的轉換後,僅僅是生成了一個優化後的邏輯計劃樹,還需要将邏輯樹轉換成實體計劃樹,才能真正地開始執行,而這一步的工作是由
QueryPlan::buildQueryPipeline()
方法完成的。
QueryPlan.cpp
buildQueryPipeline()
方法會将邏輯樹中的一個個
QueryPlanStep
結點,自底向上,從左向右轉換成
IProcessor
的執行個體對象,也就是實體計劃樹中的結點,将組織成QueryPipeline的結構。QueryPlan樹的周遊過程如下:
// 邏輯計劃樹樹的一層,Step關聯結構,->表示左邊先于右邊執行,也表示左邊的孩子是右邊
// 一條不包含子句的邏輯樹結構如下,一棵倒立的樹,其中每一個step結點就是一層,所謂的流水線就是自頂向下的父子結構:
// prepared_source_step -> prewhere_step -> remove_columns ->
// row_level_security_step -> before_array_join_step ->
// array_join_step -> before_join_step -> join_step -> where_step ->
// aggregating_step -> partial_sorting -> merge_sorting_step ->
// merging_sorted -> merging_aggregated -> having_step ->
// expression_step -> distinct_step (root結點)
// 其中prepared_source_step表示待讀入的資料源,它可能是一個子句,subquery,如果有子句,則先執行子句。
// 是以上面的流水線最終是如下的樣子:
// subquery --complete--> prepared_source_step -> ...
//
// Pipeline的拼接則是按照邏輯樹自底向上建構的,從root結點開始,最終上面的邏輯計劃樹最終就轉換成了如下的結構:
// Pipeline = initializePipeline -> FilterTransform -> ExpressionTransform -> ... -> DistinctTransform
// 是以一個Pipeline的執行順序為:從左到右。
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
checkInitialized();
optimize();
// 一個Frame表示一個結點,及到這個結點已經生成的Pipeline對象
// 這裡會采用自底向上,從左到右的方式周遊邏輯計劃樹,也就意味着對于第N層的第I個結點,隻有在遍
// 完第N層的前N-1個結點後,才會周遊到目前結點,同時,由于一條件Query語句可能存在子句且并行,
// 例如(select a union select b),是以這裡使用數組來儲存目前層可能的Pipeline執行個體,同時也作為目前層周遊的完成的檢查條件。
struct Frame
{
Node * node;
QueryPipelines pipelines = {};
};
QueryPipelinePtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
// 如果一個Step結點和其孩子結點都周遊完成時,就将這個結點對應的Pipeline對象,
// 添加到目前層的Pipeline隊列中,以便在一個流水線的上層算子,能夠通過數組的長度,
// 來判斷是不是已經處理完目前層的Step算子了。
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr;
}
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
// 目前結點的所有的子結點都已經周遊完成了,也意味着目前結點的Pipelines建構完成,
// 就将子結點生成的Pipelines綁定到目前Step結點上,這種綁定關系表示目前結點和其
// 子結點會建立的所有IProcessor算子
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
// 從最後一個結點
if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);
stack.pop();
}
else
/// 建立一個新的子結點的Pipelines結構
stack.push(Frame{.node = frame.node->children[next_child]});
}
for (auto & context : interpreter_context)
last_pipeline->addInterpreterContext(std::move(context));
return last_pipeline;
}
下面舉例說明,如何從一個QueryPlanStep轉換到對應的IProcessor執行個體,從上面的代碼可以看到這個過程應該是在
updatePipeline(...)
方法中完成,例如這裡有過濾條件
dt='20210501
,那麼它就對應一個
FilterStep
的執行個體,它的updatePipeline方法定義如下:
QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
{
if (collect_processors)
{
// 收集Pipelines中的第一個Pipeline對象
QueryPipelineProcessorsCollector collector(*pipelines.front(), this);
// 将目前的ITransformingStep的執行個體類對應的IProcessor執行個體,添加到第一個Pipeline對象中
transformPipeline(*pipelines.front());
// detachProcessors()方法會将Pipleline中已經存入的所有IProcessor執行個體更新
// 的Step引用,更新為目前Step,表示
// 同時将更新的IProcessor執行個體集合,指派給目前Step執行個體的,友善在建立Pipeline時
processors = collector.detachProcessors();
}
else
transformPipeline(*pipelines.front());
return std::move(pipelines.front());
}
下面舉例一個過濾算子從邏輯結點到實體結點的建構過程,代碼如下:
void FilterStep::transformPipeline(QueryPipeline & pipeline)
{
auto expression = std::make_shared<ExpressionActions>(actions_dag);
// 向輸入的Pipeline對象中,添加一個過濾算子,即下面的lambda表達式傳回的結果,
// FilterTransform
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
});
// 如果輸入的Pipeline對象所包含的輸入流,與目前的算子的輸出結果擁有不同的schema資訊,
// 則必然是調用了某些表達式,将原來的字段類型,轉換成了另外一個類型,是以
// 再向Pipeline對象中追加一個ExpressionTransform操作,用于将之前的字段的類型轉換
// 為新的輸出類型。
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
}
FilterStep
更新Pipeline的過程比較簡單,僅僅是建立相應的IProcessor執行個體對象,并添加到Pipeline中的預設分組中即可。
Query語句執行過程中的相關類
QueryPlan.h
可以看到QueryPlan是一棵樹結果,它的每一個結點
Node
,實際上是對
QueryPlanStepPtr
的封裝,而
QueryPlanStepPtr
是指向
QueryPlanStep
類型的對象的指針,類定義如下:
/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
class QueryPlan
{
public:
QueryPlan();
~QueryPlan();
QueryPlan(QueryPlan &&);
QueryPlan & operator=(QueryPlan &&);
void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlanPtr> plans);
void addStep(QueryPlanStepPtr step);
bool isInitialized() const { return root != nullptr; } /// Tree is not empty
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
void optimize();
QueryPipelinePtr buildQueryPipeline();
/// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.
Pipe convertToPipe();
void explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options);
void explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options);
/// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines.
/// TODO: make it in a better way.
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getMaxThreads() const { return max_threads; }
void addInterpreterContext(std::shared_ptr<Context> context);
/// Tree node. Step and it's children.
struct Node
{
QueryPlanStepPtr step;
std::vector<Node *> children = {};
};
using Nodes = std::list<Node>;
private:
Nodes nodes;
Node * root = nullptr;
void checkInitialized() const;
void checkNotCompleted() const;
/// Those fields are passed to QueryPipeline.
size_t max_threads = 0;
std::vector<std::shared_ptr<Context>> interpreter_context;
};
從它的定義可以看到,除了一些構成樹的基本資訊外,
QueryPlan
還包含了一個
optimize()
方法,它是對邏輯樹進行優化的過程,後面再看它的實作吧。
QueryPipeline.h
Pipe.h
/// Pipe is a set of processors which represents the part of pipeline.
/// Pipe contains a list of output ports, with specified port for totals and specified port for extremes.
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
class Pipe
{
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts
Holder holder;
/// Header is common for all output below.
Block header;
Processors processors;
/// Output ports. Totals and extremes are allowed to be empty.
OutputPortRawPtrs output_ports;
OutputPort * totals_port = nullptr;
OutputPort * extremes_port = nullptr;
/// It is the max number of processors which can be executed in parallel for each step.
/// Usually, it's the same as the number of output ports.
size_t max_parallel_streams = 0;
/// If is set, all newly created processors will be added to this too.
/// It is needed for debug. See QueryPipelineProcessorsCollector.
Processors * collected_processors = nullptr;
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
/// So, we may be sure that Pipe always has output port if not empty.
bool isCompleted() const { return !empty() && output_ports.empty(); }
static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
void setOutputFormat(ProcessorPtr output);
friend class QueryPipeline;
}
ITransformingStep.h
繼承結構:class ITransformingStep : public IQueryPlanStep
此類表示這個結點有一個輸入流和一個輸出流。它的許多實作類都在*Step.h頭檔案中定義,如下列舉了邏輯計劃樹中所有可能的結點類型:
.//Processors/QueryPlan/AggregatingStep.h
.//Processors/QueryPlan/AddingConstColumnStep.h
.//Processors/QueryPlan/ExpressionStep.h
.//Processors/QueryPlan/ReadNothingStep.h
.//Processors/QueryPlan/LimitByStep.h
.//Processors/QueryPlan/SettingQuotaAndLimitsStep.h
.//Processors/QueryPlan/ArrayJoinStep.h
.//Processors/QueryPlan/ReverseRowsStep.h
.//Processors/QueryPlan/OffsetStep.h
.//Processors/QueryPlan/AddingDelayedSourceStep.h
.//Processors/QueryPlan/DistinctStep.h
.//Processors/QueryPlan/CubeStep.h
.//Processors/QueryPlan/RollupStep.h
.//Processors/QueryPlan/LimitStep.h
.//Processors/QueryPlan/CreatingSetsStep.h
.//Processors/QueryPlan/TotalsHavingStep.h
.//Processors/QueryPlan/PartialSortingStep.h
.//Processors/QueryPlan/MaterializingStep.h
.//Processors/QueryPlan/FillingStep.h
.//Processors/QueryPlan/FilterStep.h
.//Processors/QueryPlan/UnionStep.h
.//Processors/QueryPlan/MergeSortingStep.h
.//Processors/QueryPlan/AddingMissedStep.h
.//Processors/QueryPlan/IQueryPlanStep.h
.//Processors/QueryPlan/ExtremesStep.h
.//Processors/QueryPlan/MergingSortedStep.h
.//Processors/QueryPlan/ISourceStep.h
.//Processors/QueryPlan/ITransformingStep.h
.//Processors/QueryPlan/MergingAggregatedStep.h
.//Processors/QueryPlan/FinishSortingStep.h
*Transform.h
實體計劃樹中的可能結點類型,基本上和Step結點是一一對應的:
.//Processors/IInflatingTransform.h
.//Processors/OffsetTransform.h
.//Processors/ISimpleTransform.h
.//Processors/IAccumulatingTransform.h
.//Processors/Merges/VersionedCollapsingTransform.h
.//Processors/Merges/AggregatingSortedTransform.h
.//Processors/Merges/SummingSortedTransform.h
.//Processors/Merges/MergingSortedTransform.h
.//Processors/Merges/CollapsingSortedTransform.h
.//Processors/Merges/IMergingTransform.h
.//Processors/Merges/ReplacingSortedTransform.h
.//Processors/Merges/GraphiteRollupSortedTransform.h
.//Processors/LimitTransform.h
.//Processors/Transforms/ArrayJoinTransform.h
.//Processors/Transforms/DistinctTransform.h
.//Processors/Transforms/FillingTransform.h
.//Processors/Transforms/MergeSortingTransform.h
.//Processors/Transforms/PartialSortingTransform.h
.//Processors/Transforms/ExtremesTransform.h
.//Processors/Transforms/JoiningTransform.h
.//Processors/Transforms/CopyTransform.h
.//Processors/Transforms/MaterializingTransform.h
.//Processors/Transforms/ReverseTransform.h
.//Processors/Transforms/AddingMissedTransform.h
.//Processors/Transforms/CubeTransform.h
.//Processors/Transforms/FinishSortingTransform.h
.//Processors/Transforms/LimitByTransform.h
.//Processors/Transforms/AggregatingTransform.h
.//Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h
.//Processors/Transforms/LimitsCheckingTransform.h
.//Processors/Transforms/ExpressionTransform.h
.//Processors/Transforms/AggregatingInOrderTransform.h
.//Processors/Transforms/TotalsHavingTransform.h
.//Processors/Transforms/CreatingSetsTransform.h
.//Processors/Transforms/AddingConstColumnTransform.h
.//Processors/Transforms/RollupTransform.h
.//Processors/Transforms/MergingAggregatedTransform.h
.//Processors/Transforms/SortingTransform.h
.//Processors/Transforms/FilterTransform.h
.//Processors/Transforms/AddingSelectorTransform.h
.//DataStreams/SquashingTransform.h
邏輯樹QueryPlan的優化過程
實際上QueryPlan在建立過程雖然已經有做了一部分優化,但這裡獨立了一個方法專門用于更多的優化,但目前能夠看到的僅僅是Limit的下推,實際上還有許多可以優化的過程,但ClickHouse并沒有實作,是以可以說CH對于簡單的QUERY語句能夠有一個比較好的優化結果,但不善于就會複雜的Query語句。
void QueryPlan::optimize()
{
struct Frame
{
Node * node;
size_t next_child = 0;
};
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (frame.next_child == 0)
{
/// First entrance, try push down.
if (frame.node->children.size() == 1)
tryPushDownLimit(frame.node->step, frame.node->children.front());
}
if (frame.next_child < frame.node->children.size())
{
stack.push(Frame{frame.node->children[frame.next_child]});
++frame.next_child;
}
else
{
/// Last entrance, try lift up.
if (frame.node->children.size() == 1)
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
stack.pop();
}
}
}