天天看點

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

Kylin源碼分析系列四—Cube查詢

注:Kylin源碼分析系列基于Kylin的2.5.0版本的源碼,其他版本可以類比。

一. 簡介

前面文章介紹了Cube是如何建構的,那建構完成後使用者肯定是需要對這些預統計的資料進行相關的查詢操作,這篇文章就介紹下Kylin中是怎樣通過SQL語句來進行Cube資料的查詢的。Kylin中的查詢是在web頁面上輸入sql語句然後送出來執行相關查詢,頁面上的送出也是向Kylin的Rest Server發送restful請求,方法與前面文章介紹的Cube建構的觸發方式類似,通過angularJS發送restful請求,請求url為/kylin/api/query,Kylin的Rest Server接收到該請求後,進行Cube資料的查詢。

Kylin中使用的是Apache Calcite查詢引擎。Apache Calcite是面向 Hadoop 的查詢引擎,它提供了标準的 SQL 語言、多種查詢優化和連接配接各種資料源的能力,除此之外,Calcite 還提供了 OLAP 和流處理的查詢引擎。

Apache Calcite具有以下幾個技術特性

  • 支援标準SQL 語言;
  • 獨立于程式設計語言和資料源,可以支援不同的前端和後端;
  • 支援關系代數、可定制的邏輯規劃規則和基于成本模型優化的查詢引擎;
  • 支援物化視圖(materialized view)的管理(建立、丢棄、持久化和自動識别);
  • 基于物化視圖的 Lattice 和 Tile 機制,以應用于 OLAP 分析;
  • 支援對流資料的查詢。

這裡不詳細介紹每個特性,讀者可以自行去學習了解。Kylin之是以選擇這個查詢引擎正是由于Calcite 可以很好地支援物化視圖和星模式這些 OLAP 分析的關鍵特性。

二. 源碼解析

  Rest Server接收到查詢的RestFul請求後,根據url将其分發到QueryController控制器來進行處理:

@RequestMapping(value = "/query", method = RequestMethod.POST, produces = { "application/json" })
@ResponseBody
public SQLResponse query(@RequestBody PrepareSqlRequest sqlRequest) {
    return queryService.doQueryWithCache(sqlRequest);
}
           

  後面就由QueryService來進行查詢處理:

public SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
    long t = System.currentTimeMillis();
    //檢查權限
    aclEvaluate.checkProjectReadPermission(sqlRequest.getProject());
    logger.info("Check query permission in " + (System.currentTimeMillis() - t) + " ms.");
    return doQueryWithCache(sqlRequest, false);
}
public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect) {
    Message msg = MsgPicker.getMsg();
    // 擷取使用者名
    sqlRequest.setUsername(getUserName());

    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    String serverMode = kylinConfig.getServerMode();
    // 服務模式不為query和all的無法進行查詢
    if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase())
            || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
        throw new BadRequestException(String.format(msg.getQUERY_NOT_ALLOWED(), serverMode));
    }
    // project不能為空
    if (StringUtils.isBlank(sqlRequest.getProject())) {
        throw new BadRequestException(msg.getEMPTY_PROJECT_NAME());
    }
    // project not found
    ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
    if (mgr.getProject(sqlRequest.getProject()) == null) {
        throw new BadRequestException(msg.getPROJECT_NOT_FOUND());
    }
    // sql語句不能為空
    if (StringUtils.isBlank(sqlRequest.getSql())) {
        throw new BadRequestException(msg.getNULL_EMPTY_SQL());
    }

    // 用于儲存使用者查詢輸入的相關參數,一般用于調試
    if (sqlRequest.getBackdoorToggles() != null)
        BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
    // 初始化查詢上下文,設定了queryId和queryStartMillis
    final QueryContext queryContext = QueryContextFacade.current();
    // 設定新的查詢線程名
    try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
        SQLResponse sqlResponse = null;
        // 擷取查詢的sql語句
        String sql = sqlRequest.getSql();
        String project = sqlRequest.getProject();
        // 是否開啟了查詢緩存,kylin.query.cache-enabled預設開啟
        boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig);
        logger.info("Using project: " + project);
        logger.info("The original query:  " + sql);
        // 移除sql語句中的注釋
sql = QueryUtil.removeCommentInSql(sql);

        Pair<Boolean, String> result = TempStatementUtil.handleTempStatement(sql, kylinConfig);
        boolean isCreateTempStatement = result.getFirst();
        sql = result.getSecond();
        sqlRequest.setSql(sql);

        // try some cheap executions
        if (sqlResponse == null && isQueryInspect) {
            sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
        }

        if (sqlResponse == null && isCreateTempStatement) {
            sqlResponse = new SQLResponse(null, null, 0, false, null);
        }
        // 緩存中直接查詢
        if (sqlResponse == null && isQueryCacheEnabled) {
            sqlResponse = searchQueryInCache(sqlRequest);
        }

        // real execution if required
        if (sqlResponse == null) {
            // 并發查詢限制, kylin.query.project-concurrent-running-threshold, 預設為0, 無 
            // 限制
            try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
                // 查詢,如有必要更新緩存
                sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled);
            }
        }
        sqlResponse.setDuration(queryContext.getAccumulatedMillis());
        logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
        try {
            recordMetric(sqlRequest, sqlResponse);
        } catch (Throwable th) {
            logger.warn("Write metric error.", th);
        }
        if (sqlResponse.getIsException())
            throw new InternalErrorException(sqlResponse.getExceptionMessage());
        return sqlResponse;
    } finally {
        BackdoorToggles.cleanToggles();
        QueryContextFacade.resetCurrent();
    }
}
           

  下面接着調用queryAndUpdateCache,看下具體源碼:

private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCacheEnabled) {
    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    Message msg = MsgPicker.getMsg();
    final QueryContext queryContext = QueryContextFacade.current();
    SQLResponse sqlResponse = null;
    try {
        // 判斷是不是select查詢語句
        final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
        if (isSelect) {
            sqlResponse = query(sqlRequest, queryContext.getQueryId());
          // 查詢下推到其他的查詢引擎,比如直接通過hive查詢
        } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) {
            sqlResponse = update(sqlRequest);
        } else {
            logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
            throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
        }
. . . 
    return sqlResponse;
}
public SQLResponse query(SQLRequest sqlRequest, String queryId) throws Exception {
    SQLResponse ret = null;
    try {
        final String user = SecurityContextHolder.getContext().getAuthentication().getName();
        // 加入到查詢隊列,BadQueryDetector會對該查詢進行檢測,看是否逾時或是否為慢查詢(預設 
        // 90S)
        badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user, queryId);
        ret = queryWithSqlMassage(sqlRequest);
        return ret;
    } finally {
        String badReason = (ret != null && ret.isPushDown()) ? BadQueryEntry.ADJ_PUSHDOWN : null;
        badQueryDetector.queryEnd(Thread.currentThread(), badReason);
        Thread.interrupted(); //reset if interrupted
    }
}
private SQLResponse executeRequest(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception {
    Statement stat = null;
    ResultSet resultSet = null;
    boolean isPushDown = false;

    Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
    try {
        stat = conn.createStatement();
        processStatementAttr(stat, sqlRequest);
        resultSet = stat.executeQuery(correctedSql);
        r = createResponseFromResultSet(resultSet); 
    } catch (SQLException sqlException) {
        r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
        if (r == null)
            throw sqlException;
        isPushDown = true;
    } finally {
        close(resultSet, stat, null); //conn is passed in, not my duty to close
    }
    return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
}
           

stat.executeQuery(correctedSql)接着就是calcite對SQL語句的解析優化處理,該部分内容這裡不較長的描述,具體的堆棧資訊如下:

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

下面接着看OLAPEnumerator中的queryStorage:

private ITupleIterator queryStorage() {
    logger.debug("query storage...");
    // bind dynamic variables
    olapContext.bindVariable(optiqContext);
    olapContext.resetSQLDigest();
    SQLDigest sqlDigest = olapContext.getSQLDigest();
    // query storage engine
    // storageEngine為CubeStorageQuery,繼承GTCubeStorageQueryBase
    IStorageQuery storageEngine = StorageFactory.createQuery(olapContext.realization);
    ITupleIterator iterator = storageEngine.search(olapContext.storageContext, sqlDigest,
            olapContext.returnTupleInfo);
    if (logger.isDebugEnabled()) {
        logger.debug("return TupleIterator...");
    }
    return iterator;
}
           

然後調用GTCubeStorageQueryBase的search方法,在該方法中為每個cube segment建立一個CubeSegmentScanner:

public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
    // 這一步有個很重要的步驟就是根據查詢條件找到對應的cuboid(findCuboid)
    GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
    List<CubeSegmentScanner> scanners = Lists.newArrayList();
    SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
    for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
        CubeSegmentScanner scanner;
        scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
                request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                request.getMetrics(), request.getDynFuncs(), //
                request.getFilter(), request.getHavingFilter(), request.getContext());
        if (!scanner.isSegmentSkipped())
            scanners.add(scanner);
    }
    if (scanners.isEmpty())
        return ITupleIterator.EMPTY_TUPLE_ITERATOR;
    return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(),
            request.getDynGroups(), request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest);
}

public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, //
        Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, //
        Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
        TupleFilter originalfilter, TupleFilter havingFilter, StorageContext context) {
    logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName());
    this.cuboid = cuboid;
    this.cubeSeg = cubeSeg;
    //the filter might be changed later in this CubeSegmentScanner (In ITupleFilterTransformer)
    //to avoid issues like in https://issues.apache.org/jira/browse/KYLIN-1954, make sure each CubeSegmentScanner
    //is working on its own copy
    byte[] serialize = TupleFilterSerializer.serialize(originalfilter, StringCodeSystem.INSTANCE);
    TupleFilter filter = TupleFilterSerializer.deserialize(serialize, StringCodeSystem.INSTANCE);
    // translate FunctionTupleFilter to IN clause
    ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
    filter = translator.transform(filter);
    CubeScanRangePlanner scanRangePlanner;
    try {
        scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
                dynGroupExprs, metrics, dynFuncs, havingFilter, context);
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    scanRequest = scanRangePlanner.planScanRequest();
    // gtStorage為配置項kylin.storage.hbase.gtstorage, 預設值為
    // org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC
    String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
    scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
}
           

然後在CubeSegmentScanner中建構ScannerWorker:

public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage,
        StorageContext context) {
    inputArgs = new Object[] { segment, cuboid, scanRequest, gtStorage, context };
    if (scanRequest == null) {
        logger.info("Segment {} will be skipped", segment);
        internal = new EmptyGTScanner();
        return;
    }
    final GTInfo info = scanRequest.getInfo();
    try {
        // 這裡的rpc為org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC
        IGTStorage rpc = (IGTStorage) Class.forName(gtStorage)
                .getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class)
                .newInstance(segment, cuboid, info, context); // default behavior
         // internal為每個segment的查詢結果,後面會調用iterator擷取結果,calcite會将各個segment 
         // 的結果進行聚合, EnumerableDefaults中的aggregate
        internal = rpc.getGTScanner(scanRequest);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    checkNPE();
}
           

接着調用CubeHBaseEndpointRPC中的getGTScanner方法,然後調用runEPRange方法:

private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
        final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
        final ExpectedSizeIterator epResultItr) {
    final String queryId = queryContext.getQueryId();
    try {
        final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
                HBaseConnection.getCoprocessorPool());
        table.coprocessorService(CubeVisitService.class, startKey, endKey, //
                new Batch.Call<CubeVisitService, CubeVisitResponse>() {
                    public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
                        . . .
                        ServerRpcController controller = new ServerRpcController();
                        BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
                        try {
                            //發送請求到hbase的協處理器進行資料查詢
                            rowsService.visitCube(controller, request, rpcCallback);
                            CubeVisitResponse response = rpcCallback.get();
                            if (controller.failedOnException()) {
                                throw controller.getFailedOn();
                            }
                            return response;
                        } catch (Exception e) {
                            throw e;
                        } finally {
                            // Reset the interrupted state
                            Thread.interrupted();
                        }
                    }
                }, new Batch.Callback<CubeVisitResponse>() {
                    // 接收到協處理器發回的查詢結果
                    @Override
                    public void update(byte[] region, byte[] row, CubeVisitResponse result) {
                        . . .
                        // 擷取hbase協處理器傳回的查詢結果中的相關狀态資料
                        Stats stats = result.getStats();
                        queryContext.addAndGetScannedRows(stats.getScannedRowCount());
                        queryContext.addAndGetScannedBytes(stats.getScannedBytes());
                        queryContext.addAndGetReturnedRows(stats.getScannedRowCount()
                                - stats.getAggregatedRowCount() - stats.getFilteredRowCount());
                        RuntimeException rpcException = null;
                        if (result.getStats().getNormalComplete() != 1) {
                            // record coprocessor error if happened
                            rpcException = getCoprocessorException(result);
                        }
                        queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
                                cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
                                cuboid.getId(), storageContext.getFilterMask(), rpcException,
                                stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
                                stats.getScannedRowCount(),
                                stats.getScannedRowCount() - stats.getAggregatedRowCount()
                                        - stats.getFilteredRowCount(),
                                stats.getAggregatedRowCount(), stats.getScannedBytes());
                        if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
                            rpcException = new ResourceLimitExceededException(
                                    "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
                                            + cubeSeg.getConfig().getQueryMaxScanBytes());
                        } else if (queryContext.getReturnedRows() > cubeSeg.getConfig().getQueryMaxReturnRows()) {
                            rpcException = new ResourceLimitExceededException(
                                    "Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold "
                                            + cubeSeg.getConfig().getQueryMaxReturnRows());
                        }
                        if (rpcException != null) {
                            queryContext.stop(rpcException);
                            return;
                        }
                        try {
                            // 對傳回的查詢結果資料進行處理(查詢結果資料可能被壓縮)
                            if (compressionResult) {
                                epResultItr.append(CompressionUtils.decompress(
                                        HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
                            } else {
                                epResultItr.append(
                                        HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
                            }
                        } catch (IOException | DataFormatException e) {
                            throw new RuntimeException(logHeader + "Error when decompressing", e);
                        }
                    }
                });
    } catch (Throwable ex) {
        queryContext.stop(ex);
    }
      . . .
   }
           

Kylin通過發送visitCube請求到HBase協處理器進行查詢,協處理器中執行的函數位于CubeVisitService中,函數名為visitCube:

public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request,
        RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
    List<RegionScanner> regionScanners = Lists.newArrayList();
    HRegion region = null;
    StringBuilder sb = new StringBuilder();
    byte[] allRows;
    String debugGitTag = "";
    CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null;
    // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env.
    KylinConfig kylinConfig = KylinConfig.createKylinConfig(request.getKylinProperties());
    // 擷取請求中的查詢ID
    String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId";
    logger.info("start query {} in thread {}", queryId, Thread.currentThread().getName());
    try (SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
            SetThreadName ignored = new SetThreadName("Query %s", queryId)) { 
        final long serviceStartTime = System.currentTimeMillis();
        region = (HRegion) env.getRegion();
        region.startRegionOperation();
        debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
        final GTScanRequest scanReq = GTScanRequest.serializer
                .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
        // 擷取查詢逾時時間
        final long deadline = scanReq.getStartTime() + scanReq.getTimeout();
        checkDeadline(deadline);
        List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
        // 擷取要查詢的hbase的 Column列(例如,F1:M) 
        for (IntList intList : request.getHbaseColumnsToGTList()) {
            hbaseColumnsToGT.add(intList.getIntsList());
        }
        StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
        // 從request請求體中獲RawScan
        final List<RawScan> hbaseRawScans = deserializeRawScans(
                ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
        appendProfileInfo(sb, "start latency: " + (serviceStartTime - scanReq.getStartTime()), serviceStartTime);
        final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
        for (RawScan hbaseRawScan : hbaseRawScans) {
            if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
                //if has shard, fill region shard to raw scan start/end
                updateRawScanByCurrentRegion(hbaseRawScan, region,
                        request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
            }
            // 根據RawScan來建構HBase的Scan(确定startRow,stopRow,fuzzyKeys和hbase 
            // columns)
            Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
            RegionScanner innerScanner = region.getScanner(scan);
            regionScanners.add(innerScanner);
            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
            cellListsForeachRawScan.add(cellListIterator);
        }
        final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
        if (behavior.ordinal() < StorageSideBehavior.SCAN.ordinal()) {
            //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
            List<Cell> temp = Lists.newArrayList();
            int counter = 0;
            for (RegionScanner innerScanner : regionScanners) {
                while (innerScanner.nextRaw(temp)) {
                    counter++;
                }
            }
            appendProfileInfo(sb, "scanned " + counter, serviceStartTime);
        }
        if (behavior.ordinal() < StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
            scanReq.disableAggCacheMemCheck(); // disable mem check if so told
        }
        final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
        ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists,
                scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold)
                !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
                deadline);
        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns,
                hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(),
                request.getIsExactAggregate()); 
        IGTScanner rawScanner = store.scan(scanReq);
        // 這裡會根據查詢中是否有聚合來将rawScanner進行包裝,包裝成GTAggregateScanner來對這個 
        // region中查詢出來的資料進行聚合操作
        IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(),
                behavior.aggrToggledOn(), false, request.getSpillEnabled());
        ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
        long finalRowCount = 0L;
        try {
 // 對查詢的每條Record進行處理
 for (GTRecord oneRecord : finalScanner) {
                buffer.clear();
                try {
                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
                } catch (BufferOverflowException boe) {
                    buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
                }
                outputStream.write(buffer.array(), 0, buffer.position());
                finalRowCount++;
                //if it's doing storage aggr, then should rely on GTAggregateScanner's limit check
                if (!scanReq.isDoingStorageAggregation()
                        && (scanReq.getStorageLimitLevel() != StorageLimitLevel.NO_LIMIT
                                && finalRowCount >= storagePushDownLimit)) {
                    //read one more record than limit
                    logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
                    break;
                }
            }
        } catch (KylinTimeoutException e) {
            logger.info("Abort scan: {}", e.getMessage());
            errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
                    .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT).setMessage(e.getMessage())
                    .build();
        } catch (ResourceLimitExceededException e) {
            logger.info("Abort scan: {}", e.getMessage());
            errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
                    .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED)
                    .setMessage(e.getMessage()).build();
        } finally {
            finalScanner.close();
        }
        long rowCountBeforeAggr = finalScanner instanceof GTAggregateScanner
                ? ((GTAggregateScanner) finalScanner).getInputRowCount()
                : finalRowCount;
        appendProfileInfo(sb, "agg done", serviceStartTime);
        logger.info("Total scanned {} rows and {} bytes", cellListIterator.getTotalScannedRowCount(),
                cellListIterator.getTotalScannedRowBytes());
        //outputStream.close() is not necessary
        byte[] compressedAllRows;
        if (errorInfo == null) {
            allRows = outputStream.toByteArray();
        } else {
            allRows = new byte[0];
        }
        if (!kylinConfig.getCompressionResult()) {
            compressedAllRows = allRows;
        } else {
            // 對結果進行壓縮傳輸,減少網絡傳輸資料量
            compressedAllRows = CompressionUtils.compress(allRows);
        }
        appendProfileInfo(sb, "compress done", serviceStartTime);
        logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length);
        OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory
                .getOperatingSystemMXBean();
        double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
        double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
        double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
        appendProfileInfo(sb, "server stats done", serviceStartTime);
        sb.append(" debugGitTag:" + debugGitTag);
        CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
        if (errorInfo != null) {
            responseBuilder.setErrorInfo(errorInfo);
        }
        // 向請求端發送查詢結果
        done.run(responseBuilder.//
                setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies 
                setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder()
                        .setFilteredRowCount(cellListIterator.getTotalScannedRowCount() - rowCountBeforeAggr)
                        .setAggregatedRowCount(rowCountBeforeAggr - finalRowCount)
                        .setScannedRowCount(cellListIterator.getTotalScannedRowCount())
                        .setScannedBytes(cellListIterator.getTotalScannedRowBytes())
                        .setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis())
                        .setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize)
                        .setFreeSwapSpaceSize(freeSwapSpaceSize)
                        .setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString())
                        .setNormalComplete(errorInfo == null ? 1 : 0).build())
                .build());
    } catch (DoNotRetryIOException e) {
        . . .
           } catch (IOException ioe) {
             . . .
           } finally {
                  . . .
               }
       . . .
    }
}
           

例子:

Cube的涉及次元如下:

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

度量為:

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

次元和rowKey設計如下:

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

針對查詢語句:select minute_start, count(*), sum(amount), sum(qty) from kylin_streaming_table where user_age in(10,11,12,13,14,15) and country in('CHINA','CANADA','INDIA') group by minute_start order by minute_start

如上述代碼流程所示:

首先會根據查詢涉及的列計算出cuboid的id為265(100001001),由于涉及minute_start,而minute_start、hour_start和day_start為衍生次元,是以最終的cuboid為457(111001001),後面會根據查詢的條件計算出scan,包括範圍(5個次元列和3個度量列)為[null, null, null, 10, CANADA, null, null, null](pkStart)到[null, null, null, 15, INDIA, null, null, null](pkEnd)(後面的三個null值會被忽略掉)和根據笛卡爾積會計算出18個filter值(fuzzyKeys):

Kylin源碼分析系列四—Cube查詢Kylin源碼分析系列四—Cube查詢

用于後面查詢過濾(使用FuzzyRowFilter過濾器);還有就是查詢hbase涉及的column也會根據查詢語句中涉及的列來進行确定。然後後面會使用getGTScanner中的preparedHBaseScans來對scan的range(pkStart和pkEnd)和fuzzyKeys進行編碼轉化然後序列化形成請求體中的hbaseRawScan,後面的hbase協處理器就是用這個參數來建構HBase的Scan進行查詢。

三. 總結

        之前有測試Kylin的查詢,發現其查詢性能非常穩定,不會随查詢的資料量的增長而大幅的增長,通過上面的源碼分析基本可以知道其原因,Kylin通過Calcite将SQL語句解析優化後,得到具體的hbase的scan查詢,然後使用hbase的協處理器(endpoint模式)來查詢,将查詢請求通過protobuf協定發送到hbase的regionServer,然後通過協處理器來進行過濾查詢和初步聚合,最後會将查詢結果進行壓縮然後發回請求端,然後再進一步聚合得到最終的查詢結果。

繼續閱讀