天天看點

eKuiper 源碼解讀:從一條 SQL 到流處理任務的旅程

作者:Java機械師

概述

LF Edge eKuiper 是 Golang 實作的輕量級物聯網邊緣分析、流式處理開源軟體,可以運作在各類資源受限的邊緣裝置上。eKuiper 的主要目标是在邊緣端提供一個流媒體軟體架構。其規則引擎允許使用者提供基于SQL 或基于圖形(類似于 Node-RED)的規則,在幾分鐘内建立物聯網邊緣分析應用。

本文中,我們将以源碼為脈絡,闡述一條 SQL 從被 eKuiper 接收後,是如何從一條文本變成一個可執行的處理過程。通過本文,你可以了解到以下内容:

  1. 一個 SQL 計算引擎基本的處理流程
  2. eKuiper 在每個處理流程中的具體代碼節點

準備

為了更加直覺地了解到 eKuiper 内部的代碼運作邏輯,在講解 eKuiper 規則引擎的處理過程中,我們會涉及到 eKuiper 中的一部分代碼,并對其中的關鍵部分進行較為詳細的講解。

為了更好地了解之後的内容,你需要了解:

  1. eKuiper 項目:github.com/lf-edge/eku…
  2. Golang 的基礎用法

架構

從 eKuiper 接收到 SQL 的文本,到最終根據這個 SQL 的語義去做出相應的讀取與計算工作。eKuiper 内部的 SQL 計算引擎在其中承擔了解析、構造、優化與運作這總共 4 部分工作,即我們之後将重點關注 SQL 處理過程中的以下幾個環節:

  1. SQL Parser 将 SQL 文本轉換為 AST 對象
  2. 基于 AST 對象生成邏輯計劃
  3. 優化邏輯計劃并生成執行算子
  4. 運作執行算子,開始讀取資料與計算并最終将結果寫入到下遊

從 SQL 文本到執行算子樹

從這一節開始,我們将開始根據 eKuiper 中的代碼節點,來了解一條 SQL 文本是如何一步步被最終轉換為一個可以被實際執行的算子樹。

以下代碼實際展示了 eKuiper 代碼中解析文本、優化計劃、構造執行算子這幾個處理流程,我們将一一進行展開了解。

go複制代碼func PlanSQLWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
    sql := rule.Sql
    conf.Log.Infof("Init rule with options %+v", rule.Options)
    stmt, err := xsql.GetStatementFromSql(sql)
    if err != nil {
       return nil, err
    }

    ......
    // Create logical plan and optimize. Logical plans are a linked list
    lp, err := createLogicalPlan(stmt, rule.Options, store)
    if err != nil {
       return nil, err
    }
    tp, err := createTopo(rule, lp, sources, sinks, streamsFromStmt)
    if err != nil {
       return nil, err
    }
    return tp, nil
}
           

解析 SQL 文本

通過以下函數,我們将一個 SQL 文本解析為了 AST 對象

go複制代碼func GetStatementFromSql(sql string) (*ast.SelectStatement, error) {
           

本文中我們暫時先不涉及 SQL 解析器中的具體實作細節,相關内容将在之後的 eKuiper 源碼閱讀中進行講解。感興趣的朋友可以通過以下函數作為入口進行了解:

scss複制代碼func (p *Parser) Parse() (*ast.SelectStatement, error) {
           

值得一提的是,在 SQL Parser的具體實作上,也有一些其他 well-known 的資料庫實作使用了 yacc 的方案來直接生成 SQL Parser。eKuiper 之是以選擇自己實作 SQL Parser,一個非常重要的原因是對于一個運作在邊緣端的應用而言,binary size 是一個非常重要的名額。自己實作 SQL Parser 而非使用 yacc 這類的 Parser Generator 的技術,有助于控制和降低 eKuiper 編譯後整體的 binary size 的大小。

構造與優化邏輯計劃

當 SQL 文本還解析為 AST 對象後,我們需要将該 AST 對象轉換為一個可以用來描述該 SQL 應當被計算引擎如何執行的邏輯計劃。這一步驟被封裝在了以下代碼函數入口中:

go複制代碼func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
           

在 createLogicalPlan 函數中,它接收一個 AST 樹對象,并返還一個邏輯計劃樹對象,在整個函數過程中,它總共做了以下 3 件事情:

  1. 抽取 SQL 中的各類資訊,并将其與實際的表達式或者是 schema 資訊進行綁定。
  2. 根據 AST 對象構造最初的邏輯計劃。
  3. 根據最初的邏輯計劃進行邏輯優化。

在一條 SQL 中,它所帶的資訊裡包含了一些原本注冊計算引擎中的資訊,比如流、表的定義,也包含了一些臨時聲明的資訊,比如列或者表達式的 alias name。在以下代碼函數入口中,eKuiper 會從 AST 樹對象中抽取出以下資訊,并進行響應的綁定:

go複制代碼func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]streamInfo, []*ast.Call, error) {
           
  1. 從 AST 樹對象中抽取出流與表的 AST 對象,并從 eKuiper 的存儲中取出預先設定好的流、表的定義,并将這些 schema 資訊綁定到 ast 對象中。
  2. 從 AST 對象中将查詢中的 filed 與各個流、表進行綁定

當我們處理好 AST 樹對象中的各個節點的資訊綁定後,我們就可以根據 AST 樹對象來構造一個最初的邏輯計劃。以下代碼顯示了在 eKuiper 中是如何根據自底向上的建構邏輯計劃。從最底層的 DataSource 算子,一路向上 build 邏輯算子,直至整個邏輯算子樹構造完畢。

go複制代碼func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
// 1. build Datasource
// 2. build Window
// 3. Buld JoinAlign / Join
// 4. Build Filter
// 5. Build Agg
// 6. Build Having
// 7. Build Sort
// 8. Build Proj
}
           

當我們獲得了最原始的邏輯計劃樹以後,我們需要對邏輯計劃進行邏輯優化。邏輯優化階段會對原本的計劃進行優化。邏輯優化階段,簡單來說就是對一個邏輯算子樹進行等價的變換,這個變換并不會影響最終的計算結果,但是可以讓計算過程減少更多不必要的計算量。

舉一個簡單的例子,對于 select * from t1 join t2 on [t1.](http://t1.id)a = t2.a where t1.b > 10 這條 SQL 來說,其原本的邏輯計劃如下:

eKuiper 源碼解讀:從一條 SQL 到流處理任務的旅程

然後在邏輯優化階段,我們可以将 Filter 算子進行下推至 Join 算子之下,進而讓參與 Join 算子的資料量被提前過濾一部分,來減少整個計算過程中所涉及到的計算量。

eKuiper 源碼解讀:從一條 SQL 到流處理任務的旅程

以下代碼展示了 eKuiper 中是如何進行邏輯優化的:

go複制代碼var optRuleList = []logicalOptRule{
    &columnPruner{},
    &predicatePushDown{},
}

func optimize(p LogicalPlan) (LogicalPlan, error) {
    var err error
    for _, rule := range optRuleList {
        p, err = rule.optimize(p)
        if err != nil {
            return nil, err
        }
    }
    return p, err
}
           

在随後的系列當中我們會比較詳細地介紹目前 eKuiper 中的邏輯優化環節中的代碼細節。

當我們的邏輯計劃優化完畢以後,我們需要根據邏輯計劃來構造具體的執行算子。在 eKuiper 中,我們通過 Topo 結構來維護整個執行算子的上下文環境。

以下代碼展示了建構執行算子的函數入口:

css複制代碼func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
type Topo struct {
   ......
   sources            []node.DataSourceNode
   sinks              []*node.SinkNode
   ops                []node.OperatorNode
   ......
}
           

Topo 作為執行算子 Context,會将邏輯計劃中的 DataSource 算子放在 sources 中,将其他算子放在 ops 中,而最終的 SQL 結果會彙總到 sinks 中。 在這裡我們重點關注算子是如何構造的:

以下代碼展示了 eKuiper 中是如何根據邏輯算子構造執行算子的:

go複制代碼func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []*node.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {
    var inputs []api.Emitter
    newIndex := index
    for _, c := range lp.Children() {
       input, ni, err := buildOps(c, tp, options, sources, streamsFromStmt, newIndex)
       .......
    }
    ......
    switch t := lp.(type) {
    case *DataSourcePlan:
       isSchemaless := t.isSchemaless
       switch t.streamStmt.StreamType {
       case ast.TypeStream:
          ......
          op = srcNode
    ......
    case *ProjectPlan:
       op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
    default:
       return nil, 0, fmt.Errorf("unknown logical plan %v", t)
    }
    ......
    if onode, ok := op.(node.OperatorNode); ok {
       tp.AddOperator(inputs, onode)
    }
    return op, newIndex, nil
}
           

在構造算子的過程中,我們主要關注 2 個問題:

  1. buildOps 是如何周遊整個邏輯算子樹,将每個邏輯算子轉換為執行算子
  2. buildOps 是如何串聯起整個執行算子的樹形結構,将下層算子的 Ouput 結果傳遞給上層算子的 Input 來源。

在 buildOps 過程中,通過遞歸的方式,以自底向上的方式周遊整個邏輯算子樹來構造執行算子。當下層算子構造完畢以後,我們在以下代碼中會将下層算子的 Ouput 作為結果參數傳遞給上層算子的構造過程中,将下層算子的 Output 和上層算子的 Input 連接配接起來

css複制代碼if onode, ok := op.(node.OperatorNode); ok {
   tp.AddOperator(inputs, onode)
}
           

當執行算子樹被建立完畢以後,我們會将頂層算子的 Output 和這條 SQL 的 sink 連接配接起來,進而使得 eKuiper 會将 SQL 計算的結果寫入到下遊的 sink 中。

go複制代碼func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
    ......
    input, _, err := buildOps(lp, tp, rule.Options, sources, streamsFromStmt, 0)
    if err != nil {
        return nil, err
    }
    inputs := []api.Emitter{input}
    ......
    for _, sink := range sinks {
        tp.AddSink(inputs, sink)
    }
    ......
    return tp, nil
}
           

啟動執行算子樹

當執行算子樹被構造完畢後,我們就需要啟動執行算子樹來真正執行這條 SQL,在以下的代碼中展示了 eKuiper 啟動執行算子的代碼入口:

go複制代碼func (s *Topo) Open() <-chan error {
    ......
    for _, snk := range s.sinks {
       snk.Open(s.ctx.WithMeta(s.name, snk.GetName(), s.store), s.drain)
    }

    //apply operators, if err bail
    for _, op := range s.ops {
       op.Exec(s.ctx.WithMeta(s.name, op.GetName(), s.store), s.drain)
    }

    // open source, if err bail
    for _, source := range s.sources {
       source.Open(s.ctx.WithMeta(s.name, source.GetName(), s.store), s.drain)
    }
    .......
}
           

我們會以 sink / 執行算子 / source 的順序,開始啟動每個環節的算子。在這裡,我們以單個算子運作為例,來了解執行算子的運作過程中的大緻邏輯。

在以下的代碼中展示了,對于單個算子而言,是如何讀取下層算子的資料,進行計算,然後傳遞給上層算子進行處理。

go複制代碼func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
    ......   
    for {
       select {
       // process incoming item
       case item := <-o.input:
          ......
          result := o.op.Apply(exeCtx, item, fv, afv)
          switch val := result.(type) {
          default:
             .......
             o.Broadcast(val)
          }
       // is cancelling
       case <-ctx.Done():
          return
       }
    }
}
           

每個執行算子會從自己的 input channel 中取出下層算子傳遞的資料,對于 UnaryOperator 而言,會通過 Apply 行為來将資料進行計算,将計算後的結果通過 Broadcast 轉交給上層算子進行處理。

總結

在本篇文章中,我們以梳理關鍵代碼節點的方式了解了 eKuiper 的 SQL 計算引擎中是如何解析、處理,并最終執行這條 SQL 得到相應的結果。對于整個計算引擎關鍵處理節點裡,我們了解了每個環節的代碼大緻是如何運作的。

在後續的分享中,我們将以具體 SQL 為例,深入到各個環節、算子的内部執行的代碼邏輯,進而讓大家更好地了解 eKuiper 是如何在邊緣端接受資料、處理計算并最終寫入下遊的整體流程。敬請期待。

繼續閱讀