天天看點

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

什麼是Kylin

Apache Kylin是一個開源的、分布式的分析型資料倉庫,提供Hadoop/Spark 之上的 SQL 查詢接口及多元分析(OLAP)能力以支援超大規模資料,最初由 eBay 開發并貢獻至開源社群。它能在亞秒内查詢巨大的表。

Kylin的查詢高性能主要依賴于Cube理論,如圖所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

它将表字段劃分為次元和量度,通過預先計算,在次元上進行量度聚合并儲存聚合結果,而根據次元進行聚合查詢時,則可以命中已儲存的聚合結果,大大減少資料掃描量和實時計算量。

Kylin的系統架構如圖所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

它依賴大資料基礎設施HBase、Spark、Hadoop等實作分布式的存儲和計算,并基于這些基礎設施,設計了建構引擎和查詢引擎來分别實作資料的建構和查詢。在查詢引擎部分,Kylin使用了Calcite來實作SQL的解析、優化和執行。

什麼是Calcite

Calcite是一個用于優化異構資料源的查詢處理的基礎架構,提供了标準的 SQL 語言、多種查詢優化和連接配接各種資料源的能力。從功能上看,它支援SQL 解析、SQL 校驗、SQL 查詢優化、SQL 生成、資料連接配接查詢等,但不包括資料處理和存儲。Calcite的架構如圖所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

資料處理和存儲系統提供中繼資料和規則至Calcite,Calcite提供JDBC Server面向用戶端查詢,并對SQL查詢請求進行處理,在Calcite中,一個SQL的解析和執行大概經過以下5個步驟:

  1. 将SQL解析成抽象文法樹;
  2. 對抽象文法樹進行校驗;
  3. 将抽象文法樹解析成關系代數表達式;
  4. 對關系代數表達式進行優化,在保持語義不變的前提下,轉化為較優的表達式;
  5. 将優化後關系代數表達式轉化為實體執行計劃并計劃,傳回最終的結果。

目前Calcite在大資料和資料存儲領域有着廣泛的使用,如表所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

Kylin、Phoenix、Hive、Flink等均使用Calcite實作JDBC驅動、SQL解析、SQL校驗、SQL 查詢優化等。

Kylin查詢源碼分析

資料模型

在源碼分析時,我們使用Kylin的官方資料模型示例,如圖所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

該雪花模型包含以下5張表:

  1. KYLIN_SALES,銷售事實表;
  2. KYLIN_ACCOUNT,使用者次元表;
  3. KYLIN_CAL_DT,日期次元表;
  4. KYLIN_CATEGORY_GROUPING,類别次元表
  5. KYLIN_COUNTR,國家次元表。

示例SQL如下所示:

select s.lstg_site_id,sum(s.price) as price_sum from kylin_sales as s inner join kylin_account as a on s.buyer_id = a.account_id where a.account_country='US' group by s.lstg_site_id order by price_sum desc limit 10           

用于查詢各站點來自美國消費者的銷售額。

入口

Kylin支援多種查詢入口,包括WEB控制台、REST API、JDBC驅動、ODBC驅動等。這裡介紹了一下JDBC驅動的實作。

當用戶端使用JDBC接口通路時,加載的JDBC驅動是org.apache.kylin.jdbc.Driver,這裡,Kylin使用了JDBC驅動架構Avatica,Driver即繼承自org.apache.calcite.avatica.UnregisteredDriver,覆寫了getFactoryClassName方法,如下所示:

@Override
    protected String getFactoryClassName(JdbcVersion jdbcVersion) {
        switch (jdbcVersion) {
        case JDBC_30:
            throw new UnsupportedOperationException();
        case JDBC_40:
            return KylinJdbcFactory.Version40.class.getName();
        case JDBC_41:
        default:
            return KylinJdbcFactory.Version41.class.getName();
        }
}
           

該段代碼說明僅支援JDBC 4.0即以上版本協定,并傳回了相應的工廠類。工廠類KylinJdbcFactory實作了AvaticaFactory接口,用于建立JDBC相關接口執行個體,部分代碼如下:

@Override
public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
    return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency, resultSetHoldability);
}

@Override
public AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Signature signature, TimeZone timeZone, Frame firstFrame) throws SQLException {
    AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(statement, null, signature);
    return new KylinResultSet(statement, state, signature, resultSetMetaData, timeZone, firstFrame);
}
           

該段代碼說明JDBC相關接口的實作是在Avatica架構實作基礎的進一步繼承和擴充,例如,ResultSet接口實作是KylinResultSet,其是AvaticaResultSet的子類,KylinResultSet主要覆寫了AvaticaResultSet的execute方法,該方法的核心代碼如下所示:

KylinConnection connection = (KylinConnection) statement.connection;
IRemoteClient client = connection.getRemoteClient();

Map<String, String> queryToggles = new HashMap<>();
int maxRows = statement.getMaxRows();
queryToggles.put("ATTR_STATEMENT_MAX_ROWS", String.valueOf(maxRows));
addServerProps(queryToggles, connection);

QueryResult result;
try {
    result = client.executeQuery(sql, paramValues, queryToggles);
} catch (IOException e) {
    throw new SQLException(e);
}
           

該段代碼說明擷取IRemoteClient執行個體,執行executeQuery方法傳回查詢結果,而從IRemoteClient的實作KylinClient中的代碼可以看到,其實質上是調用kylin-server子產品的REST API “kylin/api/query”來實作查詢。

那具體看一下該API的服務端,略過Controller層、緩存命中、注釋删除等環節,查詢代碼在org.apache.kylin.rest.service.QueryService的executeRequest方法中,如下所示:

Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
try {
    stat = conn.createStatement();
    processStatementAttr(stat, sqlRequest);
    resultSet = stat.executeQuery(correctedSql);

    r = createResponseFromResultSet(resultSet);

} catch (SQLException sqlException) {
    r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
    if (r == null)
        throw sqlException;

    isPushDown = true;
} finally {
    close(resultSet, stat, null); //conn is passed in, not my duty to close
}
           

該段代碼仍是基于JDBC接口,而connection通過org.apache.kylin.query.QueryConnection的getConnection方法建立,代碼如下所示:

if (!isRegister) {
    try {
        Class<?> aClass = Thread.currentThread().getContextClassLoader()
                .loadClass("org.apache.calcite.jdbc.Driver");
        Driver o = (Driver) aClass.getDeclaredConstructor().newInstance();
        DriverManager.registerDriver(o);
    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
        e.printStackTrace();
    }
    isRegister = true;
}
File olapTmp = OLAPSchemaFactory.createTempOLAPJson(project, KylinConfig.getInstanceFromEnv());
Properties info = new Properties();
info.putAll(KylinConfig.getInstanceFromEnv().getCalciteExtrasProperties());
// Import calcite props from jdbc client(override the kylin.properties)
info.putAll(BackdoorToggles.getJdbcDriverClientCalciteProps());
info.put("model", olapTmp.getAbsolutePath());
info.put("typeSystem", "org.apache.kylin.query.calcite.KylinRelDataTypeSystem");
return DriverManager.getConnection("jdbc:calcite:", info);           

其加載的JDBC驅動是org.apache.calcite.jdbc.Driver,說明後續基于Calcite進行SQL解析、校驗、優化和執行。這裡,通過Calcite标準的model字段傳入中繼資料描述檔案

中繼資料

中繼資料描述檔案示例如下所示:

{
    "version": "1.0",
    "defaultSchema": "DEFAULT",
    "schemas": [
        {
            "type": "custom",
            "name": "DEFAULT",
            "factory": "org.apache.kylin.query.schema.OLAPSchemaFactory",
            "operand": {
                "project": "learn_kylin"
            },
            "functions": [
               {
                   name: 'PERCENTILE',
                   className: 'org.apache.kylin.measure.percentile.PercentileAggFunc'
               },
               {
                   name: 'CONCAT',
                   className: 'org.apache.kylin.query.udf.ConcatUDF'
               },
               {
                   name: 'MASSIN',
                   className: 'org.apache.kylin.query.udf.MassInUDF'
               },
               {
                   name: 'INTERSECT_COUNT',
                   className: 'org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc'
               },
               {
                   name: 'VERSION',
                   className: 'org.apache.kylin.query.udf.VersionUDF'
               },
               {
                   name: 'PERCENTILE_APPROX',
                   className: 'org.apache.kylin.measure.percentile.PercentileAggFunc'
               }
            ]
        }
    ]
}           

OLAPSchemaFactory類實作了Calcite的SchemaFactory接口,建立OLAPSchema類執行個體,而OLAPSchema類則通過讀取HBase上存儲的中繼資料資訊生成OLAPTable類的Map集合,代碼如下所示:

public Map<String, Table> getTableMap() {
    return buildTableMap();
}           

OLAPTable類實作了Calcite的QueryableTable和TranslatableTable接口(這兩個接口均繼承自Table接口),用于描述表,其中比較重要的幾個方法如下所示:

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    if (this.rowType == null) {
        // always build exposedColumns and rowType together
        this.sourceColumns = getSourceColumns();
        this.rowType = deriveRowType(typeFactory);
    }
    return this.rowType;
}           

該方法在Table接口中定義,用于擷取表字段資訊,Kylin除了按Calcite定義傳回RelDataType類型的字段資訊,也會按自身定義儲存ColumnDesc類型的字段集合,其中包含次元資訊,用于後續執行實體查詢時判斷從哪些Cuboid掃描資料。

public Statistic getStatistic() {
    List<ImmutableBitSet> keys = new ArrayList<ImmutableBitSet>();
    return Statistics.of(100, keys);
}           

該方法在Table接口中定義,用于擷取表的行數等統計資訊,在CBO方式的優化中計算成本,但是Kylin存儲引擎統計的是各Cuboid的統計資訊,是以這裡統一傳回固定值。

@Override
public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
    int fieldCount = relOptTable.getRowType().getFieldCount();
    int[] fields = identityList(fieldCount);
    return new OLAPTableScan(context.getCluster(), relOptTable, this, fields);
}
           

該方法在TranslatableTable接口中定義,說明在優化過程中,掃描表資料的關系表達式節點會轉化為OLAPTableScan類執行個體,關于OLAPTableScan類會在後續優化、執行過程中再介紹。

同時,OLAPTable還有一些方法用于傳回Enumerable接口執行個體(也就是Kylin實作的OLAPQuery類),如下所示:

public Enumerable<Object[]> executeOLAPQuery(DataContext optiqContext, int ctxSeq) {
    return new OLAPQuery(optiqContext, EnumeratorTypeEnum.OLAP, ctxSeq);
}

public Enumerable<Object[]> executeLookupTableQuery(DataContext optiqContext, int ctxSeq) {
    return new OLAPQuery(optiqContext, EnumeratorTypeEnum.LOOKUP_TABLE, ctxSeq);
}

public Enumerable<Object[]> executeColumnDictionaryQuery(DataContext optiqContext, int ctxSeq) {
    return new OLAPQuery(optiqContext, EnumeratorTypeEnum.COL_DICT, ctxSeq);
}

public Enumerable<Object[]> executeHiveQuery(DataContext optiqContext, int ctxSeq) {
    return new OLAPQuery(optiqContext, EnumeratorTypeEnum.HIVE, ctxSeq);
}
           

這些方法在執行實體查詢時,Calcite會通過反射進行調用,擷取OLAPQuery類,實際的資料掃描委托給OLAPQuery完成,關于OLAPQuery類會在後續優化、執行過程中再介紹。

解析

查詢請求傳遞到服務端,并在服務端執行stat.executeQuery(correctedSql)後,會進入具體的查詢流程,首先進行SQL的解析。Calcite的文法解析是基于JavaCC實作的。JavaCC是一個文法解析器生成架構, 其根據預先定義的規則生成相應的解析器代碼。Calcite的文法解析規則是calcite-core中的codegen/templates/Parser.jj,根據其生成的解析器類是org.apache.calcite.sql.parser.impl. SqlParserImpl。調用該類執行個體解析SQL生成抽象文法樹(即SqlNode)的代碼在CalcitePrepareImpl中,如下所示:

SqlParser parser = createParser(query.sql,  parserConfig);
SqlNode sqlNode;
try {
  sqlNode = parser.parseStmt();
  statementType = getStatementType(sqlNode.getKind());
} catch (SqlParseException e) {
  throw new RuntimeException(
      "parse failed: " + e.getMessage(), e);
}
           

示例SQL經過解析得到的抽象文法樹如圖所示:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

樹中的每個節點是SqlNode子類執行個體。

校驗

抽象文法樹通過SqlValidatorImpl類的validate方法進行校驗,校驗的細節此處暫不展開,僅列出校驗的主要步驟包括:

  1. 對抽象文法樹根節點進行标準化重寫,在保持語義的前提下,将SqlOrderBy、SqlDelete、SqlUpdate、SqlMerge等類型節點重寫為SqlSelect類型節點;
  2. 調用各節點的validate方法進行校驗。

代碼如下:

SqlNode outermostNode = performUnconditionalRewrites(topNode, false);
cursorSet.add(outermostNode);
top = outermostNode;
TRACER.trace("After unconditional rewrite: {}", outermostNode);
if (outermostNode.isA(SqlKind.TOP_LEVEL)) {
  registerQuery(scope, null, outermostNode, outermostNode, null, false);
}
outermostNode.validate(this, scope);
           

是以,上一步的抽象文法樹經過校驗會轉化為下圖:

Kylin查詢源碼分析什麼是Kylin什麼是CalciteKylin查詢源碼分析

關系代數表達式

校驗後的抽象文法樹通過SqlToRelConverter 類的convertQueryRecursive方法進一步解析成關系代數表達式。這個方法代碼如下所示:

final SqlKind kind = query.getKind();
switch (kind) {
case SELECT:
  return RelRoot.of(convertSelect((SqlSelect) query, top), kind);
case INSERT:
  return RelRoot.of(convertInsert((SqlInsert) query), kind);
case DELETE:
  return RelRoot.of(convertDelete((SqlDelete) query), kind);
case UPDATE:
  return RelRoot.of(convertUpdate((SqlUpdate) query), kind);
case MERGE:
  return RelRoot.of(convertMerge((SqlMerge) query), kind);
case UNION:
case INTERSECT:
case EXCEPT:
  return RelRoot.of(convertSetOp((SqlCall) query), kind);
case WITH:
  return convertWith((SqlWith) query, top);
case VALUES:
  return RelRoot.of(convertValues((SqlCall) query, targetRowType), kind);
default:
  throw new AssertionError("not a query: " + query);
           

從這個方法的命名就可以看出,方法内部是在遞歸調用,從抽象文法樹根節點開始,根據其類型,分别調用對應的convertSelect、convertInsert、convertDelete、convertUpdate、convertMerge等方法,轉化為RelNode類型執行個體(RelNode是關系代數表達式節點接口),而在這些方法内部,會對傳入節點的子節點,根據其類型,再遞歸調用這些方法,進而将抽象文法樹所有節點均轉化為RelNode類型執行個體,并建立互相之間的關系。通過convertSelect方法具體轉化SqlSelect類型節點的代碼部分如下所示:

convertFrom(
    bb,
    select.getFrom());
convertWhere(
    bb,
    select.getWhere());

final List<SqlNode> orderExprList = new ArrayList<>();
final List<RelFieldCollation> collationList = new ArrayList<>();
gatherOrderExprs(
    bb,
    select,
    select.getOrderList(),
    orderExprList,
    collationList);
final RelCollation collation =
    cluster.traitSet().canonize(RelCollations.of(collationList));

if (validator.isAggregate(select)) {
  convertAgg(
      bb,
      select,
      orderExprList);
} else {
  convertSelectList(
      bb,
      select,
      orderExprList);
}

if (select.isDistinct()) {
  distinctify(bb, true);
}

convertOrder(
    select, bb, collation, orderExprList, select.getOffset(),
    select.getFetch());
           

其依次對from、where、groupBy、orderBy等子節點進行轉化。最終,上一步的抽象文法樹進行轉化後得到下列關系代數表達式:

LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10])
  LogicalAggregate(group=[{0}], PRICE_SUM=[SUM($1)])
    LogicalProject(LSTG_SITE_ID=[$4], PRICE=[$5])
      LogicalFilter(condition=[=($16, 'US')])
        LogicalJoin(condition=[=($7, $13)], joinType=[inner])
          OLAPTableScan(table=[[DEFAULT, KYLIN_SALES]], ctx=[], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]])
          OLAPTableScan(table=[[DEFAULT, KYLIN_ACCOUNT]], ctx=[], fields=[[0, 1, 2, 3, 4, 5]])
           

其根節點排序(LogicalSort),葉子節點是兩張表的資料掃描(OLAPTableScan)

優化

在轉化得到關系代數表達式後,會進一步對其進行優化,在保持語義不變的前提下,對表達式進行轉化和調整以找到最優的表達式。優化器可以分為2類:

  1. 基于規則的優化器(Rule Based Optimizer,簡稱RBO):根據優化規則将一個關系表達式轉化為另外一個關系表達式,同時原有表達式被放棄,經過一系列轉化後生成最終表達式;
  2. 基于成本的優化器(Cost Based Optimizer,簡稱CBO):根據優化規則對關系表達式進行轉換,同時原有表達式也會保留,經過一系列轉化後生成多個表達式,之後計算每個表達式的成本,從中挑選成本最小的表達式作為最終表達式。

Calcite有兩個優化器實作:

  1. HepPlanner: RBO的實作,按照規則進行比對,直到達到次數限制或者周遊後無比對規則;
  2. VolcanoPlanner: CBO 的實作,一直疊代各規則,直到找到成本最小的表達式。

Calcite的優化在Prepare類的optimize方法中,部分代碼如下所示:

final RelOptPlanner planner = root.rel.getCluster().getPlanner();

…

final Program program = getProgram();
final RelNode rootRel4 = program.run(
    planner, root.rel, desiredTraits, materializationList, latticeList);
           

首先生成一個VolcanoPlanner類型的優化器執行個體,然後通過getProgram擷取Program接口執行個體,并通過該接口的run方法執行優化。Program接口有多個實作,getProgram方法最終是通過Programs類的standard方法擷取的SequenceProgram類執行個體,代碼如下所示:

return sequence(subQuery(metadataProvider),
    new DecorrelateProgram(),
    new TrimFieldsProgram(),
    program1,

    // Second planner pass to do physical "tweaks". This the first time
    // that EnumerableCalcRel is introduced.
    calc(metadataProvider));
           

也就是說,優化可劃分為串行執行的5步,包括将子查詢轉化為Join操作、删除無用字段等,其中第三步是使用已建立的VolcanoPlanner類型優化器執行個體進行優化。關系代數表達式的葉子節點是OLAPTableScan類型,該類覆寫了RelNode的register方法,其中添加了Kylin擴充的多個規則,如下表所示:

說明
OLAPToEnumerableConverterRule 将RelNode類型節點轉化為OLAPToEnumerableConverter類型節點
OLAPFilterRule 将LogicalFilter類型節點轉化為OLAPFilterRel類型節點
OLAPProjectRule 将LogicalProject類型節點轉化為OLAPProjectRel類型節點
OLAPAggregateRule 将LogicalAggregate類型節點轉化為OLAPAggregateRel類型節點
OLAPJoinRule 将LogicalJoin 類型節點轉化為 OLAPJoinRel或OLAPFilterRel類型節點
OLAPLimitRule 将Sort類型節點轉化為OLAPLimitRel類型節點
OLAPSortRule 将Sort類型節點轉化為OLAPSortRel類型節點
OLAPUnionRule 将Union類型節點轉化為OLAPUnionRel類型節點
OLAPWindowRule 将Window類型節點轉化為OLAPWindowRel類型節點
OLAPValuesRule 将LogicalValues類型節點轉化為OLAPValuesRel類型節點

同時,還删除了多個Calcite原生的規則,部分如下所示:

// since join is the entry point, we can't push filter past join
planner.removeRule(FilterJoinRule.FILTER_ON_JOIN);
planner.removeRule(FilterJoinRule.JOIN);

// since we don't have statistic of table, the optimization of join is too cost
planner.removeRule(JoinCommuteRule.INSTANCE);
planner.removeRule(JoinPushThroughJoinRule.LEFT);
planner.removeRule(JoinPushThroughJoinRule.RIGHT);

// keep tree structure like filter -> aggregation -> project -> join/table scan, implementOLAP() rely on this tree pattern
planner.removeRule(AggregateJoinTransposeRule.INSTANCE);
planner.removeRule(AggregateProjectMergeRule.INSTANCE);
planner.removeRule(FilterProjectTransposeRule.INSTANCE);
planner.removeRule(SortJoinTransposeRule.INSTANCE);
planner.removeRule(JoinPushExpressionsRule.INSTANCE);
planner.removeRule(SortUnionTransposeRule.INSTANCE);
planner.removeRule(JoinUnionTransposeRule.LEFT_UNION);
planner.removeRule(JoinUnionTransposeRule.RIGHT_UNION);
planner.removeRule(AggregateUnionTransposeRule.INSTANCE);
planner.removeRule(DateRangeRules.FILTER_INSTANCE);
planner.removeRule(SemiJoinRule.JOIN);
planner.removeRule(SemiJoinRule.PROJECT);
           

以此來保持表達式中的一些模式,便于後續實體執行計劃的相關操作。經過上述優化後,關系表達式轉化為如下結果:

OLAPLimitRel(ctx=[], fetch=[10])
    OLAPSortRel(sort0=[$1], dir0=[DESC], ctx=[])
      OLAPAggregateRel(group=[{0}], PRICE_SUM=[SUM($1)], ctx=[])
        OLAPProjectRel(LSTG_SITE_ID=[$4], PRICE=[$5], ctx=[])
          OLAPFilterRel(condition=[=($16, 'US')], ctx=[])
            OLAPJoinRel(condition=[=($7, $13)], joinType=[inner], ctx=[])
              OLAPTableScan(table=[[DEFAULT, KYLIN_SALES]], ctx=[], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]])
              OLAPTableScan(table=[[DEFAULT, KYLIN_ACCOUNT]], ctx=[], fields=[[0, 1, 2, 3, 4, 5]])
           

執行

最後是将關系代數表達式轉化為實際實體執行計劃,掃描HBase中的資料并傳回結果,這裡正在梳理過程中,可先參考官方推薦的文檔

https://www.jianshu.com/p/21df8303d2ae

了解其原理,其中,會執行OLAPTableScan類的implement方法,如下所示:

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {

    context.setReturnTupleInfo(rowType, columnRowType);
    String execFunction = genExecFunc();

    PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), JavaRowFormat.ARRAY);
    MethodCallExpression exprCall = Expressions.call(table.getExpression(OLAPTable.class), execFunction,
            implementor.getRootExpression(), Expressions.constant(context.id));
    return implementor.result(physType, Blocks.toBlock(exprCall));
}

public String genExecFunc() {
    // if the table to scan is not the fact table of cube, then it's a lookup table
    if (context.realization.getModel().isLookupTable(tableName)) {
        return "executeLookupTableQuery";
    } else if (DictionaryEnumerator.ifDictionaryEnumeratorEligible(context)) {
        return "executeColumnDictionaryQuery";
    } else {
        return "executeOLAPQuery";
    }

}
           

通過反射調用OLAPQuery類的相關方法,這些方法在之前介紹中繼資料時曾提及,其均傳回Enumerable接口執行個體。以OLAPEnumerator為例,其在疊代擷取結果時,實質上是調用queryStorage方法,代碼如下所示:

private ITupleIterator queryStorage() {

    logger.debug("query storage...");
    // bind dynamic variables
    olapContext.bindVariable(optiqContext);

    // If olapContext is cached, then inherit it.
    if (!olapContext.isBorrowedContext) {
        olapContext.resetSQLDigest();
    }
    SQLDigest sqlDigest = olapContext.getSQLDigest();

    // query storage engine
    IStorageQuery storageEngine = StorageFactory.createQuery(olapContext.realization);
    ITupleIterator iterator = storageEngine.search(olapContext.storageContext, sqlDigest,
            olapContext.returnTupleInfo);
    if (logger.isDebugEnabled()) {
        logger.debug("return TupleIterator...");
    }

    return iterator;
}
           

擷取相應的IStorageQuery接口執行個體,通過協處理器掃描HBase資料。