天天看點

Kylin源碼分析系列二—Cube建構Kylin源碼分析系列二—Cube建構

Kylin源碼分析系列二—Cube建構

注:Kylin源碼分析系列基于Kylin的2.5.0版本的源碼,其他版本可以類比。

1.建構流程

前面一篇文章介紹了Kylin中的任務排程服務,本篇文章正式介紹Kylin的核心内容Cube,主要講述Cube建構的過程。下面的建構過程選擇使用spark建構引擎來說明(MR引擎自行類比閱讀相關源碼)。

首先介紹下Cube建構的整體流程,看下kylin web頁面上展示的建構過程:

Kylin源碼分析系列二—Cube建構Kylin源碼分析系列二—Cube建構

主要有如下幾個步驟:

  1. 首先建立一個大平表(Flat Hive Table),該表的資料是将建立cube涉及到的次元從原有的事實表和次元表中查詢出來組成一條完整的資料插入到一個新的hive表中;後續的cube建構就是基于這個表的。抽取資料的過程使用的是Hive指令,Kylin使用conf/kylin_hive_conf.xml配置檔案中的配置項,使用者可以根據需求修改和添加相關配置項。
  2. 經過第一步後,Hive會在HDFS目錄下生成一些資料檔案,這些資料檔案可能大小不一,這就會導緻後續的任務執行不均衡,有些任務執行很快,有些可能會很慢。為了是這些資料分布更均勻,Kylin增加了該步驟來重新配置設定各個資料檔案中的資料。執行如下hive指令:
Kylin源碼分析系列二—Cube建構Kylin源碼分析系列二—Cube建構

     3. 接着Kylin擷取次元列的distinct值(即次元基數),用于後面一步進行字典編碼。

     4. 這一步就根據前面獲得的次元的distinct值來建構字典,通常這一步會很快,但是如果distinct值的集合很大,Kylin可能會報           錯,例如,“Too high cardinality is not suitable for dictionary”。對于UHC(超大次元基數)列,請使用其他編碼方式,例             如“fixed_length”,“integer”等。

     5. 這步操作很簡單,隻是儲存cube的一些相關統計資料,比如有多少cuboid,每個cuboid有多少行資料等。

     6. 這一步是建立儲存cube資料的hbase表,目前的版本cube資料隻支援儲存到hbase中,kylin社群目前正在開發将cube資料            直接儲存為parquet格式檔案(适用于雲上環境);這裡有一點需要說明一下,在建表的時候啟用了hbase協處理的功能              (endpoint模式),需要将協處理器的相關jar包deploy到對應的hbase表上,後面會詳細介紹,這樣做是為了提升Kylin的查             詢性能。

     7. 這裡就是真正的建立cube了,本文的描述是基于spark建構引擎的,使用的by layer的方式建構的,即先建構Base                         Cuboid,然後一層一層的往上聚合,得到其他的cuboid的資料;當使用MR引擎的時候,可以配置cube建構算法,通過                 kylin.cube.algorithm來配置,值有[“auto”, “layer”, “inmem”],預設值為auto,使用者根據環境的資源情況來進行配置,使用             auto的時候,kylin會根據系統資源情況來選擇layer還是inmem,layer算法是一層一層的計算,需要的資源較少,但是花費           的時間可能會更長,而使用inmem算法則建構的更快,但是會消耗更多的記憶體,具體可以參考                                                       https://blog.csdn.net/sunnyyoona/article/details/52318176。

     8. 這一步将Cuboid資料轉化為HFile檔案。

     9. 将轉化後的HFile檔案直接load到HBase裡面供後續查詢使用。

   10. 更新Cube的相關資訊。

   11. 清理Hive中的臨時資料。

2.源碼分析

下面從源碼來看Cube的建構過程:

在Kylin頁面上點選build後,觸發的是一個任務送出的流程,該任務送出的流程簡要介紹下:

1.頁面點選Submit按鈕,通過js觸發rebuild事件,發送restful請求:

Kylin源碼分析系列二—Cube建構Kylin源碼分析系列二—Cube建構

rebuild的具體處理源碼在webapp/app/js/controllers/cubes.js中:

Kylin源碼分析系列二—Cube建構Kylin源碼分析系列二—Cube建構

最終調用restful api接口/kylin/api/cubes/{cubeName}/rebuild将請求發送至服務端,CubeService定義在webapp/app/js/services/cubes.js。

2.Rest Server服務端接收到restful請求,根據請求的URL将請求分發到對應的控制器進行處理(使用了Spring的@Controller和@RequestMapping注解),這裡的Cube建構請求最終被分發到CubeController控制器由rebuild函數進行處理:

/** Build/Rebuild a cube segment */
/**
 * Build/Rebuild a cube segment
 */
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
    return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
            req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
}
           

然後看buildInternal函數:

private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, //
        Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
        String buildType, boolean force) {
    try {
        //擷取送出任務的使用者的使用者名
        String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
        //擷取Cube執行個體
        CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
        //檢測有多少個處于即将建構的狀态的job,預設隻能同時提10個job,大于則會抛異常,送出失敗
        checkBuildingSegment(cube);
        //通過jobService來送出任務,即為上篇文章介紹的Cube任務排程服務
        return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
                CubeBuildTypeEnum.valueOf(buildType), force, submitter);
    } catch (Throwable e) {
        logger.error(e.getLocalizedMessage(), e);
        throw new InternalErrorException(e.getLocalizedMessage(), e);
    }
}
           

然後看JobService中的submitJob,該函數隻是做了權限認證,然後直接調用了submitJobInternal:

public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
        Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
        CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException {
. . .
        try {
        if (buildType == CubeBuildTypeEnum.BUILD) {
            //擷取資料源類型(HiveSource、JdbcSource、KafkaSource)
            ISource source = SourceManager.getSource(cube);
            //資料範圍
            SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
                    sourcePartitionOffsetEnd);
            //kafka資料源确定start offset和endoffset
            src = source.enrichSourcePartitionBeforeBuild(cube, src);
            //添加segment
            newSeg = getCubeManager().appendSegment(cube, src);
            //通過建構引擎來建構Job
            job = EngineFactory.createBatchCubingJob(newSeg, submitter);
        } else if (buildType == CubeBuildTypeEnum.MERGE) {
            newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
            job = EngineFactory.createBatchMergeJob(newSeg, submitter);
        } else if (buildType == CubeBuildTypeEnum.REFRESH) {
            newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
            job = EngineFactory.createBatchCubingJob(newSeg, submitter);
        } else {
            throw new BadRequestException(String.format(msg.getINVALID_BUILD_TYPE(), buildType));
        }
        //送出任務,可以參考前面任務排程的文章了解任務具體是怎麼執行的
        getExecutableManager().addJob(job);
    } catch (Exception e) {
      . . . 
    }
    JobInstance jobInstance = getSingleJobInstance(job);
    return jobInstance;
}
           

接着看EngineFactory.createBatchCubingJob方法,根據cube執行個體中配置的引擎類型來确定使用什麼引擎,目前有mapreduce和spark兩種引擎,開發者也可以添加自己的建構引擎(通過kylin.engine.provider加入)。下面以spark引擎來繼續分析,後面直接到SparkBatchCubingJobBuilder2的build,這個函數就是cube建構任務的核心:

public CubingJob build() {
    logger.info("Spark new job to BUILD segment " + seg);
    //建構job任務(DefaultChainedExecutable類型,是一個任務鍊)
    final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
    final String jobId = result.getId();
    //擷取cuboid在hdfs上的資料目錄
    final String cuboidRootPath = getCuboidRootPath(jobId);
    // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
    inputSide.addStepPhase1_CreateFlatTable(result);
    // Phase 2: Build Dictionary
    // 擷取次元列的distinct值(即次元基數)
    result.addTask(createFactDistinctColumnsSparkStep(jobId));
    // 針對高基數次元(Ultra High Cardinality)單獨起MR任務來建構字典,主要是ShardByColumns
    // 和GlobalDictionaryColumns
    if (isEnableUHCDictStep()) {
        result.addTask(createBuildUHCDictStep(jobId));
    }
    // 建立次元字典
    result.addTask(createBuildDictionaryStep(jobId));
    // 儲存一些統計資料
    result.addTask(createSaveStatisticsStep(jobId));
    // add materialize lookup tables if needed
    LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
    // 建立hbase表
    outputSide.addStepPhase2_BuildDictionary(result);
    // Phase 3: Build Cube
    addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
    //将上一步計算後的cuboid檔案轉換成hfile,然後将hfile load到hbase的表中
    outputSide.addStepPhase3_BuildCube(result);
    // Phase 4: Update Metadata & Cleanup
    result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
    inputSide.addStepPhase4_Cleanup(result);
    outputSide.addStepPhase4_Cleanup(result);

    return result;
}
           

上述代碼中的流程與頁面上的建構過程基本一緻,下面詳細看下Cube計算這個步驟的實作過程,即addLayerCubingSteps(result, jobId, cuboidRootPath)。

protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
    final SparkExecutable sparkExecutable = new SparkExecutable();
    // 設定cube計算的類
    sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
    // 配置spark任務,主要為資料來源和cuboid資料儲存位置
    configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
    // task加入到job中
    result.addTask(sparkExecutable);
}
           

接着看SparkCubingByLayer中的execute方法,最終任務排程服務排程執行job中的該task時,是調用execute方法來執行的,具體的調用過程可以參考上一篇任務排程的文章:

protected void execute(OptionsHelper optionsHelper) throws Exception {
    String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
    String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
    String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
    String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
    String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
    String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
    Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
    SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
    //serialization conf
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
    conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
    KylinSparkJobListener jobListener = new KylinSparkJobListener();
    JavaSparkContext sc = new JavaSparkContext(conf);
    sc.sc().addSparkListener(jobListener);
    // 清空cuboid檔案目錄
    HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
    SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
    final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
    KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);

    final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
    final CubeDesc cubeDesc = cubeInstance.getDescriptor();
    final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);

    logger.info("RDD input path: {}", inputPath);
    logger.info("RDD Output path: {}", outputPath);

    final Job job = Job.getInstance(sConf.get());
    SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);

    int countMeasureIndex = 0;
    for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
        if (measureDesc.getFunction().isCount() == true) {
            break;
        } else {
            countMeasureIndex++;
        }
    }
    final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
    boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
    boolean allNormalMeasure = true;
    for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
        // RawMeasureType這裡為true,其他均為false
        needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
        allNormalMeasure = allNormalMeasure && needAggr[i];
    }
    logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
    StorageLevel storageLevel = StorageLevel.fromString(envConfig.getSparkStorageLevel());
    // 預設為true
    boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
    // 從hive資料源表中建構出RDD,hiveRecordInputRDD得到格式為每行資料的每列的值的
    // RDD(JavaRDD<String[]>),maptoPair是按照basecubiod(每個次元都包含),計算出格式為 
    // rowkey(shard id+cuboid id+values)和每列的值的RDD encodedBaseRDD
    final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
            .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));

    Long totalCount = 0L;
    // 預設為false
    if (envConfig.isSparkSanityCheckEnabled()) {
    // 資料總條數
        totalCount = encodedBaseRDD.count();
    }
    // 聚合路徑成本的具體方法
    final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl, sConf);
    BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
    // 度量沒有RAW的為true
    if (allNormalMeasure == false) {
        reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
    }

    final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
    JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
    int level = 0;
    int partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);

    // aggregate to calculate base cuboid
    allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
    // 資料儲存到hdfs上
    saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig);
    // 根據base cuboid上卷聚合各個層級的資料,改變資料的rowKey,去掉相應的次元
       PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, 
       metaUrl, sConf);
    // aggregate to ND cuboids
    for (level = 1; level <= totalLevels; level++) {
        partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
        // flatMapToPair得到上卷聚合後的資料,reduceByKey再進一步根據新的rowKey進行聚合操作, 
           因為進行flatMapToPair操作後會有部分資料的rowKey值相同
        allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
                .persist(storageLevel);
        allRDDs[level - 1].unpersist();
        if (envConfig.isSparkSanityCheckEnabled() == true) {
            sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
        }
        saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
    }
    allRDDs[totalLevels].unpersist();
    logger.info("Finished on calculating all level cuboids.");
    logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
    //HadoopUtil.deleteHDFSMeta(metaUrl);
}
           

        Cube在建構完所有的cuboid,原始的cuboid檔案會存到hdfs目錄下(例:/kylin/kylin_metadata/kylin-43be1d7f-4a50-b3a8-6dea-b998acec2d7b/kylin_sales_cube/cuboid),後面的createConvertCuboidToHfileStep任務會将cuboid檔案轉換成hfile檔案儲存到/kylin/kylin_metadata/kylin-43be1d7f-4a50-b3a8-6dea-b998acec2d7b/kylin_sales_cube/hfile目錄下,最後會由createBulkLoadStep任務将hfile檔案load到hbase表中(後面hfile目錄會被删除),這樣就完成了Cube的建構。這裡需要注意的是cuboid檔案在Cube建構完成後不會被删除,因為後面做Cube Segment的merge操作時是直接用已有的cuboid檔案,而不需要重新進行計算,加快合并的速度,如果你确認後面不會進行segment的合并操作,cuboid檔案可以手動删除掉以節省hdfs的存儲空間。

繼續閱讀