天天看點

ClickHouse Query執行流程淺析

文法樹生成及執行涉及的UML圖

ClickHouse Query執行流程淺析

HTTP Handler 接收Query請求

調用

Interpreters::executeQuery.h::executeQuery(…)

調用

Interpreters::executeQuery.h::executeQueryImpl(…)

生成

ASTPtr ast = Parsers::ParserQuery::parseImpl(…)

using ASTPtr = std::shared_ptr<IAST>

IAST是所有SQL語句解析後,生成的抽象文法樹的公共接口,例如可以生成一個ASTSelectWithUnionQuery類型的文法樹。

生成

auto interpreter = InterpreterFactory::get(ast, context, stage);

封裝ASTPtr為IInterpreter,例如Query語句會封裝成一個InterpreterSelectQuery類的對象,在生成此對象的執行個體時,也會對這個AST做一些簡單的分析和優化,例如PREWHERE和WHERE的調整、輸入資料的類型推斷等。

BlockIO res = interpreter->execute();

BlockIO封裝了目前QueryPipeline執行個體,以及相應的輸入、輸出流的指針,友善根據Pipeline中的每一個IProcessor執行個體的狀态,觸發流的讀寫以及注冊流上的事件,比如Query工作流完成事件。
BlockIO InterpreterSelectQuery::execute()
{
    BlockIO res;
    QueryPlan query_plan;

    buildQueryPlan(query_plan);

    res.pipeline = std::move(*query_plan.buildQueryPipeline());
    return res;
}
           

聚合函數讀取資料及執行過程

以uniq(…)方法為例,簡單講述聚合函數執行add(…)方法的過程:

Aggregator::executeOnBlock(…)

調用

Aggregator::executeWithoutKeyImpl(…)

調用

AggregateFunctionInstruction::IAggregateFunction::addBatchSinglePlace(…)

調用

IAggregateFunction::addBatchSinglePlace(…)

調用具體子類的

AggregateFunctionUniq::add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)

這個方法可以用來聚合所有的資料,

對于聚合操作,在解析計劃樹時,會向Pipeline中添加一個MergingAggregatedStep,用于将所有處理後的資料聚合成一個Block。

如果所有的Blocks都在自己的資料集上聚合完成,MergingAggregatedStep中選擇使用不同的Transformer來完成所有已經聚合了的Block間的聚合操作,例如如果在記憶體充足的情況下,可以基于記憶體作排序,最終傳回排序且聚合的一個Block;如果不足則使用Merge-sort過程,通過溢出檔案的方式完成聚合。

Select Query語句的Parse過程

從SELECT語句到AST Node的過程,使用Parser*字首的類完成。

假設有一個如下的QUERY語句:

TCPHandler類

以TCP傳輸模式接收用戶端的消息,這裡指處理SQL語句的Server端類,它的核心方法如下:

void TCPHandler::runImpl() {
// 配置Socket

// 檢查消息體

// Authenticaiton查檢及配置

    Settings connection_settings = connection_context.getSettings();

    sendHello();

    connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });

    while (true)
    {
        /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
        {
            Stopwatch idle_time;
            while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
                std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
            {
                if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
                {
                    LOG_TRACE(log, "Closing idle connection");
                    return;
                }
            }
        }

        /// If we need to shut down, or client disconnects.
        if (server.isCancelled() || in->eof())
            break;

        /// Set context of request.
        query_context = connection_context;

        Stopwatch watch;
        state.reset();

        /// Initialized later.
        std::optional<CurrentThread::QueryScope> query_scope;

        /** An exception during the execution of request (it must be sent over the network to the client).
         *  The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
         */
        std::optional<DB::Exception> exception;
        bool network_error = false;

        bool send_exception_with_stack_trace = true;

        try
        {
            /// If a user passed query-local timeouts, reset socket to initial state at the end of the query
            SCOPE_EXIT({state.timeout_setter.reset();});

            /** If Query - process it. If Ping or Cancel - go back to the beginning.
             *  There may come settings for a separate query that modify `query_context`.
             */
            if (!receivePacket())
                continue;

            query_scope.emplace(*query_context);

            send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;

            /// Should we send internal logs to client?
            const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
            
            /// 配置query執行時的上下文環境,例如setExternalTablesInitializer,setInputInitializer等
            ...

            customizeContext(*query_context);

            bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
            /// 開始執行Query語句,從解析到後生成實體計劃樹
            state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
			// 開始監聽過程
            after_check_cancelled.restart();
            after_send_progress.restart();

            if (state.io.out)
            {
                state.need_receive_data_for_insert = true;
                processInsertQuery(connection_settings);
            }
            else if (state.need_receive_data_for_input) // It implies pipeline execution
            {
            	// 在這裡調用executeQuery傳回的結果,觸發Pipeline的執行
                /// It is special case for input(), all works for reading data from client will be done in callbacks.
                auto executor = state.io.pipeline.execute();
                executor->execute(state.io.pipeline.getNumThreads());
            }
            else if (state.io.pipeline.initialized())
                processOrdinaryQueryWithProcessors();
            else if (state.io.in)
                processOrdinaryQuery();
			// 執行完成後的收尾工作
            state.io.onFinish();

            /// Do it before sending end of stream, to have a chance to show log message in client.
            query_scope->logPeakMemoryUsage();

            if (state.is_connection_closed)
                break;

            sendLogs();
            sendEndOfStream();

            /// QueryState should be cleared before QueryScope, since otherwise
            /// the MemoryTracker will be wrong for possible deallocations.
            /// (i.e. deallocations from the Aggregator with two-level aggregation)
            state.reset();
            query_scope.reset();
        }
        catch (const Exception & e)
        { ... /// 處理各種異常 }
        catch (...)
        {
            state.io.onException();
            exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
        }
		/// 處理QUERY執行後的收尾工作,例如發送日志和清理各種執行時的環境資訊
		... 

        watch.stop();

        LOG_DEBUG(log, "Processed in {} sec.", watch.elapsedSeconds());

        /// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
        query_context.reset();

        if (network_error)
            break;
    }

}
           

上面的方法中

executeQuery(...)

是query語句的入口,它的實作類在

executeQuery.cpp

檔案中。

QueryProcessingStage

Query語句執行的階段,它定義在

QueryProcessingStage.h

檔案中,如下:

/// Up to what stage the SELECT query is executed or needs to be executed.
namespace QueryProcessingStage
{
    /// Numbers matter - the later stage has a larger number.
    ///
    /// It is part of Protocol ABI, add values only to the end.
    /// Also keep in mind that the code may depends on the order of fields, so be double aware when you will add new values.
    enum Enum
    {
        /// Only read/have been read the columns specified in the query.
        FetchColumns       = 0,
        /// Until the stage where the results of processing on different servers can be combined.
        WithMergeableState = 1,
        /// Completely.
        Complete           = 2,
        /// Until the stage where the aggregate functions were calculated and finalized.
        ///
        /// It is used for auto distributed_group_by_no_merge optimization for distributed engine.
        /// (See comments in StorageDistributed).
        WithMergeableStateAfterAggregation = 3,

        MAX = 4,
    };
}
           

executeQuery.cpp

下面的方法是在TCPHandler.cpp檔案中的runImp()方法中調用的,用來執行一條query語句。

BlockIO executeQuery(
    // query語句字元串,例如select語句、insert語句、create語句等
    const String & query,
    // 上下文管理器,是從TCPHandler方法中connection_context中得到的,裡面存放了各種在Query運作時可配置的參數資訊
    Context & context,
    bool internal,
    // query語句的執行狀态
    QueryProcessingStage::Enum stage,
    // 如果是Insert語句的話,待插入的資料是跟在Insert語句之後的,如果沒有話,就不是一條insert語句
    bool may_have_embedded_data)
{
    ASTPtr ast;
    BlockIO streams;
    std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
        internal, stage, !may_have_embedded_data, nullptr);

    if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
    {
        String format_name = ast_query_with_output->format
                ? getIdentifierName(ast_query_with_output->format)
                : context.getDefaultFormat();

        if (format_name == "Null")
            streams.null_format = true;
    }

    return streams;
}
           

executeQuery(…)方法又是調用如下方法完成工作的:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
    const char * begin,
    const char * end,
    Context & context,
    bool internal,
    QueryProcessingStage::Enum stage,
    bool has_query_tail,
    ReadBuffer * istr)
{
    const auto current_time = std::chrono::system_clock::now();

    /// If we already executing query and it requires to execute internal query, than
    /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
    if (!internal)
    {
        context.makeQueryContext();
        CurrentThread::attachQueryContext(context);
    }

    const Settings & settings = context.getSettingsRef();
    /// 執行個體化一個ParserQuery對象,用來解析SQL語句,并生成一棵AST樹
    ParserQuery parser(end);
    ASTPtr ast;
    const char * query_end;

    /// Don't limit the size of internal queries.
    size_t max_query_size = 0;
    if (!internal)
        max_query_size = settings.max_query_size;

    try
    {
        /// TODO Parser should fail early when max_query_size limit is reached.
        ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);

        /// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
        /// to allow settings to take effect.
        if (const auto * select_query = ast->as<ASTSelectQuery>())
        {
            if (auto new_settings = select_query->settings())
                InterpreterSetQuery(new_settings, context).executeForCurrentContext();
        }
        else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
        {
            if (!select_with_union_query->list_of_selects->children.empty())
            {
                if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())
                    InterpreterSetQuery(new_settings, context).executeForCurrentContext();
            }
        }
        else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
        {
            if (query_with_output->settings_ast)
                InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
        }

        auto * insert_query = ast->as<ASTInsertQuery>();

        if (insert_query && insert_query->settings_ast)
            InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();

        if (insert_query && insert_query->data)
        {
            query_end = insert_query->data;
            insert_query->has_tail = has_query_tail;
        }
        else
        {
            query_end = end;
        }
    }
    catch (...)
    {
        /// Anyway log the query.
        String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));

        auto query_for_logging = prepareQueryForLogging(query, context);
        logQuery(query_for_logging, context, internal);

        if (!internal)
        {
            onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
        }

        throw;
    }

	/// 下面的代碼是解析AST樹,生成一棵實體計劃樹,實際上是QueryPipeline的執行個體對象,它是可執行的流水線,是一種嵌套的結構。
	/// QueryPipeline可以認為是對Pipe執行個體的封裝,而Pipe是對資料集上的一組transform操作,這些Pipe單元都必然有相同的資料header。
	...
           

parseQuery.cpp

此檔案中定義了一系的全局的公共方法,上面講到的

executeQuery.cpp

檔案中調用的

parseQuery(...)

方法,就是在這個檔案中定義,而

parseQuery(...)

方法實際上又是調用一開始建立的

ParserQuery

執行個體的方法完成工作的。

ParserQuery.cpp

繼承關系:class ParserQuery : public IParserBase : public IParser

從前面展示的代碼可以看到,執行Query語句的第一步是對SQL字元串的解析,生成AST樹,而parse過程的入口就是此檔案中定義的

parseImpl(...)

方法,代碼如下:

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ParserQueryWithOutput query_with_output_p;
    ParserInsertQuery insert_p(end);
    ParserUseQuery use_p;
    ParserSetQuery set_p;
    ParserSystemQuery system_p;
    ParserCreateUserQuery create_user_p;
    ParserCreateRoleQuery create_role_p;
    ParserCreateQuotaQuery create_quota_p;
    ParserCreateRowPolicyQuery create_row_policy_p;
    ParserCreateSettingsProfileQuery create_settings_profile_p;
    ParserDropAccessEntityQuery drop_access_entity_p;
    ParserGrantQuery grant_p;
    ParserSetRoleQuery set_role_p;
    ParserExternalDDLQuery external_ddl_p;

    bool res = query_with_output_p.parse(pos, node, expected)
        || insert_p.parse(pos, node, expected)
        || use_p.parse(pos, node, expected)
        || set_role_p.parse(pos, node, expected)
        || set_p.parse(pos, node, expected)
        || system_p.parse(pos, node, expected)
        || create_user_p.parse(pos, node, expected)
        || create_role_p.parse(pos, node, expected)
        || create_quota_p.parse(pos, node, expected)
        || create_row_policy_p.parse(pos, node, expected)
        || create_settings_profile_p.parse(pos, node, expected)
        || drop_access_entity_p.parse(pos, node, expected)
        || grant_p.parse(pos, node, expected)
        || external_ddl_p.parse(pos, node, expected);

    return res;
}
           

從上面的代碼可以看到,ClickHouse目前支援所有的Query句型,包括DML、DDL、DQL,而ParserQuery對象,負責逐個嘗試解析,一旦遇到能夠解析完成的句型,就傳回成功,這種實作方式自然會有一定的性能損耗。

這裡我們是以SELECT句型分析的,是以這裡會在執行完

ParserQueryWithOutput

的解析後,傳回成功。

ParserQueryWithOutput.cpp

ParserInsertQuery類是繼承自IParserBase類的,是以調用

ParserQueryWithOutput::parse(...)

方法,最終會調用

parseImpl(...)

方法完成工作代碼如下:

bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ParserShowTablesQuery show_tables_p;
    ParserSelectWithUnionQuery select_p;
    ParserTablePropertiesQuery table_p;
    ParserDescribeTableQuery describe_table_p;
    ParserShowProcesslistQuery show_processlist_p;
    ParserCreateQuery create_p;
    ParserAlterQuery alter_p;
    ParserRenameQuery rename_p;
    ParserDropQuery drop_p;
    ParserCheckQuery check_p;
    ParserOptimizeQuery optimize_p;
    ParserKillQueryQuery kill_query_p;
    ParserWatchQuery watch_p;
    ParserShowAccessQuery show_access_p;
    ParserShowAccessEntitiesQuery show_access_entities_p;
    ParserShowCreateAccessEntityQuery show_create_access_entity_p;
    ParserShowGrantsQuery show_grants_p;
    ParserShowPrivilegesQuery show_privileges_p;
    ParserExplainQuery explain_p;

    ASTPtr query;

    bool parsed =
           explain_p.parse(pos, query, expected)
        || select_p.parse(pos, query, expected)
        || show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`
        || show_tables_p.parse(pos, query, expected)
        || table_p.parse(pos, query, expected)
        || describe_table_p.parse(pos, query, expected)
        || show_processlist_p.parse(pos, query, expected)
        || create_p.parse(pos, query, expected)
        || alter_p.parse(pos, query, expected)
        || rename_p.parse(pos, query, expected)
        || drop_p.parse(pos, query, expected)
        || check_p.parse(pos, query, expected)
        || kill_query_p.parse(pos, query, expected)
        || optimize_p.parse(pos, query, expected)
        || watch_p.parse(pos, query, expected)
        || show_access_p.parse(pos, query, expected)
        || show_access_entities_p.parse(pos, query, expected)
        || show_grants_p.parse(pos, query, expected)
        || show_privileges_p.parse(pos, query, expected);

    if (!parsed)
        return false;
    ...
    /// 其它的收尾工作
}
           

從上面的代碼可以看到,一條Query句型,又被細分為不同的語句,例如show、explain、select等,是以這裡繼續跟蹤

ParserSelectWithUnionQuery

類分析。

ParserSelectWithUnionQuery.cpp

一條SELECT語句,又有兩中不同的表示形式,即單句型和多句型,即存在

UNION ALL

關鍵字時,SELECT語句是多句型的,否則是單句型的。

UNION ALL

的存在,表示這條件SQL語句可以解析為多條可以

單獨執行

的QUERY語句,是以這裡以

ParserList

對象儲存可能存在的、

并行語句

,代碼如下:

bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ASTPtr list_node;

    ParserList parser(std::make_unique<ParserUnionQueryElement>(), std::make_unique<ParserKeyword>("UNION ALL"), false);
    if (!parser.parse(pos, list_node, expected))
        return false;

    auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();

    node = select_with_union_query;
    select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
    select_with_union_query->children.push_back(select_with_union_query->list_of_selects);

    // flatten inner union query
    for (auto & child : list_node->children)
        getSelectsFromUnionListNode(child, select_with_union_query->list_of_selects->children);

    return true;
}
           

從上面可以看到,不論有沒有

UNION ALL

關鍵字存在,一條SELECT語句的完整解析過程,又是交由

ParserUnionQueryElement

對象處理的。

ParserUnionQueryElement.cpp

這個類的解析方法,僅僅是建立一個新的

ParserSelectQuery

對象,然後傳回它parse後的結果。

bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
	/// ParserSubquery 嘗試解析子句型,即由(和)包含的子句
	/// ParserSelectQuery 嘗試解析SELECT語句
    if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))
        return false;

    if (const auto * ast_subquery = node->as<ASTSubquery>())
        node = ast_subquery->children.at(0);

    return true;
}
           

ParserSelectQuery.cpp

終于到了一個具體的SELECT語句的解析過程,它的方法定義如下:

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    auto select_query = std::make_shared<ASTSelectQuery>();
    node = select_query;

    ParserKeyword s_select("SELECT");
    ParserKeyword s_distinct("DISTINCT");
    ParserKeyword s_from("FROM");
    ParserKeyword s_prewhere("PREWHERE");
    ParserKeyword s_where("WHERE");
    ParserKeyword s_group_by("GROUP BY");
    ParserKeyword s_with("WITH");
    ParserKeyword s_totals("TOTALS");
    ParserKeyword s_having("HAVING");
    ParserKeyword s_order_by("ORDER BY");
    ParserKeyword s_limit("LIMIT");
    ParserKeyword s_settings("SETTINGS");
    ParserKeyword s_by("BY");
    ParserKeyword s_rollup("ROLLUP");
    ParserKeyword s_cube("CUBE");
    ParserKeyword s_top("TOP");
    ParserKeyword s_with_ties("WITH TIES");
    ParserKeyword s_offset("OFFSET");
    ParserKeyword s_fetch("FETCH");
    ParserKeyword s_only("ONLY");
    ParserKeyword s_row("ROW");
    ParserKeyword s_rows("ROWS");
    ParserKeyword s_first("FIRST");
    ParserKeyword s_next("NEXT");

    ParserNotEmptyExpressionList exp_list(false);
    ParserNotEmptyExpressionList exp_list_for_with_clause(false);
    ParserNotEmptyExpressionList exp_list_for_select_clause(true);    /// Allows aliases without AS keyword.
    ParserExpressionWithOptionalAlias exp_elem(false);
    ParserOrderByExpressionList order_list;

    ParserToken open_bracket(TokenType::OpeningRoundBracket);
    ParserToken close_bracket(TokenType::ClosingRoundBracket);

    ASTPtr with_expression_list;
    ASTPtr select_expression_list;
    ASTPtr tables;
    ASTPtr prewhere_expression;
    ASTPtr where_expression;
    ASTPtr group_expression_list;
    ASTPtr having_expression;
    ASTPtr order_expression_list;
    ASTPtr limit_by_length;
    ASTPtr limit_by_offset;
    ASTPtr limit_by_expression_list;
    ASTPtr limit_offset;
    ASTPtr limit_length;
    ASTPtr top_length;
    ASTPtr settings;

    /// WITH expr list
    {
        if (s_with.ignore(pos, expected))
        {
            if (!ParserList(std::make_unique<ParserWithElement>(), std::make_unique<ParserToken>(TokenType::Comma))
                     .parse(pos, with_expression_list, expected))
                return false;
            if (with_expression_list->children.empty())
                return false;
        }
    }
    ...
           

從上面的代碼可以看到,ClickHouse基本上支援了常見的SELECT文法,并且也實作了一些自定義的關鍵字。

Select Query語句的Interpret過程

executeQuery.cpp

前面我們知道這個檔案中定義的

executeQueryImpl(...)

方法,會産生将一條SQL字元串,解析為一棵AST樹,那接下來就是生成計劃樹的過程,即interpret的過程,代碼如下:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
    const char * begin,
    const char * end,
    Context & context,
    bool internal,
    QueryProcessingStage::Enum stage,
    bool has_query_tail,
    ReadBuffer * istr)
{
	/// 解析SQL字元串,生成AST
	...
	try
    {
	    auto interpreter = InterpreterFactory::get(ast, context, stage);

        std::shared_ptr<const EnabledQuota> quota;
        if (!interpreter->ignoreQuota())
        {
            quota = context.getQuota();
            if (quota)
            {
                quota->used(Quota::QUERIES, 1);
                quota->checkExceeded(Quota::ERRORS);
            }
        }

        StreamLocalLimits limits;
        if (!interpreter->ignoreLimits())
        {
            limits.mode = LimitsMode::LIMITS_CURRENT;
            limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
        }
        // 調用解釋器的執行方法,生成可以執行的Pipeline執行個體并執行。
        // res是BlockIO類型的變量
        res = interpreter->execute();
        QueryPipeline & pipeline = res.pipeline;
        bool use_processors = pipeline.initialized();

        if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
        {
            /// Save insertion table (not table function). TODO: support remote() table function.
            auto table_id = insert_interpreter->getDatabaseTable();
            if (!table_id.empty())
                context.setInsertionTable(std::move(table_id));
        }
        ...
    }
    ...
}
           

從上面的代碼可以看到,通過

InterpreterFactory

的工廠方法,根據解析生成的AST樹的類型,建立對應的解釋器,這裡會建立一個

InterpreterSelectQuery

的對象,然後調用

InterpreterSelectQuery::execute(..)

方法。

InterpreterSelectQuery.cpp

InterpreterFactory::get(…)方法會傳回一個InterpreterSelectQuery類的執行個體,同時這個類在建立時,就會對整個AST樹進行周遊,完成樹的檢查、及基本的優化調整工作,例如文法異常、表達式錯誤、PREWHERE語句優化、JOIN優化等,更為具體的過程見其cpp檔案中的構造方法。

完成InterpreterSelectQuery對象的建立後,這裡就顯示調用

execute()

方法,開始執行這棵樹。實際這棵樹是不能直接執行的,還需要兩個過程,如下面代碼:

BlockIO InterpreterSelectQuery::execute()
{
    BlockIO res;
    QueryPlan query_plan;
	// 産生生成一棵邏輯計劃樹
    buildQueryPlan(query_plan);
	// 生成一棵實體計劃樹,即流水線
    res.pipeline = std::move(*query_plan.buildQueryPipeline());
    return res;
}
           

buildQueryPlan(query_plan)

方法,會根據解析和調整後的AST樹,生成一棵邏輯樹,即

QueryPlan

,實際上它是一棵

QueryPlanStep

樹,它們的介紹見最後的相關類小節。

buildQueryPlan(...)

方法,内部通過

executeImpl(...)

方法,将AST樹中的結點,根據不同的操作類型,自頂向下搜集結點上的資訊,并建立一個個的QueryPlanStep對象,添加進QueryPlan中,關鍵代碼如下:

void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
{
    /** Streams of data. When the query is executed in parallel, we have several data streams.
     *  If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
     *  if there is an ORDER BY, then glue the streams using ResizeProcessor, and then MergeSorting transforms,
     *  if not, then glue it using ResizeProcessor,
     *  then apply LIMIT.
     *  If there is GROUP BY, then we will perform all operations up to GROUP BY, inclusive, in parallel;
     *  a parallel GROUP BY will glue streams into one,
     *  then perform the remaining operations with one resulting stream.
     */

    /// Now we will compose block streams that perform the necessary actions.
    auto & query = getSelectQuery();
    const Settings & settings = context->getSettingsRef();
    auto & expressions = analysis_result;
    auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
    bool intermediate_stage = false;
    bool to_aggregation_stage = false;
    bool from_aggregation_stage = false;

    if (options.only_analyze)
    {
        auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
        query_plan.addStep(std::move(read_nothing));

        if (expressions.prewhere_info)
        {
            auto prewhere_step = std::make_unique<FilterStep>(
                    query_plan.getCurrentDataStream(),
                    expressions.prewhere_info->prewhere_actions,
                    expressions.prewhere_info->prewhere_column_name,
                    expressions.prewhere_info->remove_prewhere_column);

            prewhere_step->setStepDescription("PREWHERE");
            query_plan.addStep(std::move(prewhere_step));

            // To remove additional columns in dry run
            // For example, sample column which can be removed in this stage
            if (expressions.prewhere_info->remove_columns_actions)
            {
                auto remove_columns = std::make_unique<ExpressionStep>(
                        query_plan.getCurrentDataStream(),
                        expressions.prewhere_info->remove_columns_actions);

                remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
                query_plan.addStep(std::move(remove_columns));
            }
        }
    }
    /// 其它Step的建構過程
    ...
}
           

當完成整棵QueryPlan樹的轉換後,僅僅是生成了一個優化後的邏輯計劃樹,還需要将邏輯樹轉換成實體計劃樹,才能真正地開始執行,而這一步的工作是由

QueryPlan::buildQueryPipeline()

方法完成的。

QueryPlan.cpp

buildQueryPipeline()

方法會将邏輯樹中的一個個

QueryPlanStep

結點,自底向上,從左向右轉換成

IProcessor

的執行個體對象,也就是實體計劃樹中的結點,将組織成QueryPipeline的結構。QueryPlan樹的周遊過程如下:

// 邏輯計劃樹樹的一層,Step關聯結構,->表示左邊先于右邊執行,也表示左邊的孩子是右邊
// 一條不包含子句的邏輯樹結構如下,一棵倒立的樹,其中每一個step結點就是一層,所謂的流水線就是自頂向下的父子結構:
// prepared_source_step -> prewhere_step -> remove_columns ->
// row_level_security_step -> before_array_join_step ->
// array_join_step -> before_join_step -> join_step -> where_step ->
// aggregating_step -> partial_sorting -> merge_sorting_step ->
// merging_sorted -> merging_aggregated -> having_step ->
// expression_step -> distinct_step (root結點)
// 其中prepared_source_step表示待讀入的資料源,它可能是一個子句,subquery,如果有子句,則先執行子句。
// 是以上面的流水線最終是如下的樣子:
// subquery --complete--> prepared_source_step -> ...
//
// Pipeline的拼接則是按照邏輯樹自底向上建構的,從root結點開始,最終上面的邏輯計劃樹最終就轉換成了如下的結構:
// Pipeline = initializePipeline -> FilterTransform -> ExpressionTransform -> ... -> DistinctTransform
// 是以一個Pipeline的執行順序為:從左到右。
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
    checkInitialized();
    optimize();
	// 一個Frame表示一個結點,及到這個結點已經生成的Pipeline對象
	// 這裡會采用自底向上,從左到右的方式周遊邏輯計劃樹,也就意味着對于第N層的第I個結點,隻有在遍
	// 完第N層的前N-1個結點後,才會周遊到目前結點,同時,由于一條件Query語句可能存在子句且并行,
	// 例如(select a union select b),是以這裡使用數組來儲存目前層可能的Pipeline執行個體,同時也作為目前層周遊的完成的檢查條件。
    struct Frame
    {
        Node * node;
        QueryPipelines pipelines = {};
    };

    QueryPipelinePtr last_pipeline;

    std::stack<Frame> stack;
    stack.push(Frame{.node = root});

    while (!stack.empty())
    {
        auto & frame = stack.top();

        if (last_pipeline)
        {
        	// 如果一個Step結點和其孩子結點都周遊完成時,就将這個結點對應的Pipeline對象,
        	// 添加到目前層的Pipeline隊列中,以便在一個流水線的上層算子,能夠通過數組的長度,
        	// 來判斷是不是已經處理完目前層的Step算子了。
            frame.pipelines.emplace_back(std::move(last_pipeline));
            last_pipeline = nullptr;
        }

        size_t next_child = frame.pipelines.size();
        if (next_child == frame.node->children.size())
        {
            bool limit_max_threads = frame.pipelines.empty();
            // 目前結點的所有的子結點都已經周遊完成了,也意味着目前結點的Pipelines建構完成,
            // 就将子結點生成的Pipelines綁定到目前Step結點上,這種綁定關系表示目前結點和其
            // 子結點會建立的所有IProcessor算子
            last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
			// 從最後一個結點
            if (limit_max_threads && max_threads)
                last_pipeline->limitMaxThreads(max_threads);

            stack.pop();
        }
        else
        	/// 建立一個新的子結點的Pipelines結構
            stack.push(Frame{.node = frame.node->children[next_child]});
    }

    for (auto & context : interpreter_context)
        last_pipeline->addInterpreterContext(std::move(context));

    return last_pipeline;
}
           

下面舉例說明,如何從一個QueryPlanStep轉換到對應的IProcessor執行個體,從上面的代碼可以看到這個過程應該是在

updatePipeline(...)

方法中完成,例如這裡有過濾條件

dt='20210501

,那麼它就對應一個

FilterStep

的執行個體,它的updatePipeline方法定義如下:

QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
{
    if (collect_processors)
    {
    	// 收集Pipelines中的第一個Pipeline對象
        QueryPipelineProcessorsCollector collector(*pipelines.front(), this);
        // 将目前的ITransformingStep的執行個體類對應的IProcessor執行個體,添加到第一個Pipeline對象中
        transformPipeline(*pipelines.front());
        // detachProcessors()方法會将Pipleline中已經存入的所有IProcessor執行個體更新
        // 的Step引用,更新為目前Step,表示
        // 同時将更新的IProcessor執行個體集合,指派給目前Step執行個體的,友善在建立Pipeline時
        processors = collector.detachProcessors();
    }
    else
        transformPipeline(*pipelines.front());

    return std::move(pipelines.front());
}
           

下面舉例一個過濾算子從邏輯結點到實體結點的建構過程,代碼如下:

void FilterStep::transformPipeline(QueryPipeline & pipeline)
{
    auto expression = std::make_shared<ExpressionActions>(actions_dag);
    // 向輸入的Pipeline對象中,添加一個過濾算子,即下面的lambda表達式傳回的結果,
    // FilterTransform
    pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
    {
        bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
        return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
    });
	// 如果輸入的Pipeline對象所包含的輸入流,與目前的算子的輸出結果擁有不同的schema資訊,
	// 則必然是調用了某些表達式,将原來的字段類型,轉換成了另外一個類型,是以
	// 再向Pipeline對象中追加一個ExpressionTransform操作,用于将之前的字段的類型轉換
	// 為新的輸出類型。
    if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
    {
        auto convert_actions_dag = ActionsDAG::makeConvertingActions(
                pipeline.getHeader().getColumnsWithTypeAndName(),
                output_stream->header.getColumnsWithTypeAndName(),
                ActionsDAG::MatchColumnsMode::Name);
        auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);

        pipeline.addSimpleTransform([&](const Block & header)
        {
            return std::make_shared<ExpressionTransform>(header, convert_actions);
        });
    }
}
           

FilterStep

更新Pipeline的過程比較簡單,僅僅是建立相應的IProcessor執行個體對象,并添加到Pipeline中的預設分組中即可。

Query語句執行過程中的相關類

QueryPlan.h

可以看到QueryPlan是一棵樹結果,它的每一個結點

Node

,實際上是對

QueryPlanStepPtr

的封裝,而

QueryPlanStepPtr

是指向

QueryPlanStep

類型的對象的指針,類定義如下:

/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
class QueryPlan
{
public:
    QueryPlan();
    ~QueryPlan();
    QueryPlan(QueryPlan &&);
    QueryPlan & operator=(QueryPlan &&);

    void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlanPtr> plans);
    void addStep(QueryPlanStepPtr step);

    bool isInitialized() const { return root != nullptr; } /// Tree is not empty
    bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
    const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())

    void optimize();

    QueryPipelinePtr buildQueryPipeline();

    /// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.
    Pipe convertToPipe();

    void explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options);
    void explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options);

    /// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines.
    /// TODO: make it in a better way.
    void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
    size_t getMaxThreads() const { return max_threads; }

    void addInterpreterContext(std::shared_ptr<Context> context);

    /// Tree node. Step and it's children.
    struct Node
    {
        QueryPlanStepPtr step;
        std::vector<Node *> children = {};
    };

    using Nodes = std::list<Node>;

private:
    Nodes nodes;
    Node * root = nullptr;

    void checkInitialized() const;
    void checkNotCompleted() const;

    /// Those fields are passed to QueryPipeline.
    size_t max_threads = 0;
    std::vector<std::shared_ptr<Context>> interpreter_context;
};
           

從它的定義可以看到,除了一些構成樹的基本資訊外,

QueryPlan

還包含了一個

optimize()

方法,它是對邏輯樹進行優化的過程,後面再看它的實作吧。

QueryPipeline.h

Pipe.h

/// Pipe is a set of processors which represents the part of pipeline.
/// Pipe contains a list of output ports, with specified port for totals and specified port for extremes.
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
class Pipe
{
private:
    /// Destruction order: processors, header, locks, temporary storages, local contexts
    Holder holder;

    /// Header is common for all output below.
    Block header;
    Processors processors;

    /// Output ports. Totals and extremes are allowed to be empty.
    OutputPortRawPtrs output_ports;
    OutputPort * totals_port = nullptr;
    OutputPort * extremes_port = nullptr;

    /// It is the max number of processors which can be executed in parallel for each step.
    /// Usually, it's the same as the number of output ports.
    size_t max_parallel_streams = 0;

    /// If is set, all newly created processors will be added to this too.
    /// It is needed for debug. See QueryPipelineProcessorsCollector.
    Processors * collected_processors = nullptr;

    /// This methods are for QueryPipeline. It is allowed to complete graph only there.
    /// So, we may be sure that Pipe always has output port if not empty.
    bool isCompleted() const { return !empty() && output_ports.empty(); }
    static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
    void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
    void setOutputFormat(ProcessorPtr output);

    friend class QueryPipeline;
}
           

ITransformingStep.h

繼承結構:class ITransformingStep : public IQueryPlanStep

此類表示這個結點有一個輸入流和一個輸出流。它的許多實作類都在*Step.h頭檔案中定義,如下列舉了邏輯計劃樹中所有可能的結點類型:

.//Processors/QueryPlan/AggregatingStep.h
.//Processors/QueryPlan/AddingConstColumnStep.h
.//Processors/QueryPlan/ExpressionStep.h
.//Processors/QueryPlan/ReadNothingStep.h
.//Processors/QueryPlan/LimitByStep.h
.//Processors/QueryPlan/SettingQuotaAndLimitsStep.h
.//Processors/QueryPlan/ArrayJoinStep.h
.//Processors/QueryPlan/ReverseRowsStep.h
.//Processors/QueryPlan/OffsetStep.h
.//Processors/QueryPlan/AddingDelayedSourceStep.h
.//Processors/QueryPlan/DistinctStep.h
.//Processors/QueryPlan/CubeStep.h
.//Processors/QueryPlan/RollupStep.h
.//Processors/QueryPlan/LimitStep.h
.//Processors/QueryPlan/CreatingSetsStep.h
.//Processors/QueryPlan/TotalsHavingStep.h
.//Processors/QueryPlan/PartialSortingStep.h
.//Processors/QueryPlan/MaterializingStep.h
.//Processors/QueryPlan/FillingStep.h
.//Processors/QueryPlan/FilterStep.h
.//Processors/QueryPlan/UnionStep.h
.//Processors/QueryPlan/MergeSortingStep.h
.//Processors/QueryPlan/AddingMissedStep.h
.//Processors/QueryPlan/IQueryPlanStep.h
.//Processors/QueryPlan/ExtremesStep.h
.//Processors/QueryPlan/MergingSortedStep.h
.//Processors/QueryPlan/ISourceStep.h
.//Processors/QueryPlan/ITransformingStep.h
.//Processors/QueryPlan/MergingAggregatedStep.h
.//Processors/QueryPlan/FinishSortingStep.h
           

*Transform.h

實體計劃樹中的可能結點類型,基本上和Step結點是一一對應的:

.//Processors/IInflatingTransform.h
.//Processors/OffsetTransform.h
.//Processors/ISimpleTransform.h
.//Processors/IAccumulatingTransform.h
.//Processors/Merges/VersionedCollapsingTransform.h
.//Processors/Merges/AggregatingSortedTransform.h
.//Processors/Merges/SummingSortedTransform.h
.//Processors/Merges/MergingSortedTransform.h
.//Processors/Merges/CollapsingSortedTransform.h
.//Processors/Merges/IMergingTransform.h
.//Processors/Merges/ReplacingSortedTransform.h
.//Processors/Merges/GraphiteRollupSortedTransform.h
.//Processors/LimitTransform.h
.//Processors/Transforms/ArrayJoinTransform.h
.//Processors/Transforms/DistinctTransform.h
.//Processors/Transforms/FillingTransform.h
.//Processors/Transforms/MergeSortingTransform.h
.//Processors/Transforms/PartialSortingTransform.h
.//Processors/Transforms/ExtremesTransform.h
.//Processors/Transforms/JoiningTransform.h
.//Processors/Transforms/CopyTransform.h
.//Processors/Transforms/MaterializingTransform.h
.//Processors/Transforms/ReverseTransform.h
.//Processors/Transforms/AddingMissedTransform.h
.//Processors/Transforms/CubeTransform.h
.//Processors/Transforms/FinishSortingTransform.h
.//Processors/Transforms/LimitByTransform.h
.//Processors/Transforms/AggregatingTransform.h
.//Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h
.//Processors/Transforms/LimitsCheckingTransform.h
.//Processors/Transforms/ExpressionTransform.h
.//Processors/Transforms/AggregatingInOrderTransform.h
.//Processors/Transforms/TotalsHavingTransform.h
.//Processors/Transforms/CreatingSetsTransform.h
.//Processors/Transforms/AddingConstColumnTransform.h
.//Processors/Transforms/RollupTransform.h
.//Processors/Transforms/MergingAggregatedTransform.h
.//Processors/Transforms/SortingTransform.h
.//Processors/Transforms/FilterTransform.h
.//Processors/Transforms/AddingSelectorTransform.h
.//DataStreams/SquashingTransform.h
           

邏輯樹QueryPlan的優化過程

實際上QueryPlan在建立過程雖然已經有做了一部分優化,但這裡獨立了一個方法專門用于更多的優化,但目前能夠看到的僅僅是Limit的下推,實際上還有許多可以優化的過程,但ClickHouse并沒有實作,是以可以說CH對于簡單的QUERY語句能夠有一個比較好的優化結果,但不善于就會複雜的Query語句。

void QueryPlan::optimize()
{
    struct Frame
    {
        Node * node;
        size_t next_child = 0;
    };

    std::stack<Frame> stack;
    stack.push(Frame{.node = root});

    while (!stack.empty())
    {
        auto & frame = stack.top();

        if (frame.next_child == 0)
        {
            /// First entrance, try push down.
            if (frame.node->children.size() == 1)
                tryPushDownLimit(frame.node->step, frame.node->children.front());
        }

        if (frame.next_child < frame.node->children.size())
        {
            stack.push(Frame{frame.node->children[frame.next_child]});
            ++frame.next_child;
        }
        else
        {
            /// Last entrance, try lift up.
            if (frame.node->children.size() == 1)
                tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);

            stack.pop();
        }
    }
}