/** Build a new cube segment, typically its time range appends to the end of current cube. */ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter); }
MRBatchCubingEngine2
1 2 3
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return new BatchCubingJobBuilder2(newSegment, submitter).build(); }
public CubingJob build() { logger.info("MR_V2 new job to BUILD segment " + seg);
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); final String jobId = result.getId(); final String cuboidRootPath = getCuboidRootPath(jobId);
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) .getConfig(); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
// create flat table first addStepPhase1_DoCreateFlatTable(jobFlow);
// then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); }
// special for hive addStepPhase1_DoMaterializeLookupTable(jobFlow); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
//get the flat Hive table size Matcher matcher = HDFS_LOCATION.matcher(cmd); if (matcher.find()) { String hiveFlatTableHdfsUrl = matcher.group(1); long size = getFileSize(hiveFlatTableHdfsUrl); info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size); logger.info("HDFS_Bytes_Writen: " + size); } getManager().addJobInfo(getId(), info); if (response.getFirst() != 0) { throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); } }
Phase 2: Build Dictionary
Phase 3: Build Cube
BatchCubingJobBuilder2
1 2 3 4
// Phase 3: Build Cube addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute outputSide.addStepPhase3_BuildCube(result);
1 2 3 4 5 6 7 8 9 10
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds()); // base cuboid step result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= maxLevel; i++) { result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); } }
for (Text value : values) { if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handling value with ordinal (This is not KV number!): " + vcounter); } codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); aggs.aggregate(input, needAggrMeasures); } aggs.collectStates(result);
The “by-layer” Cubing divides a big task into a couple steps, and each step bases on the previous step’s output, so it can reuse the previous calculation and also avoid calculating from very beginning when there is a failure in between. These makes it as a reliable algorithm. When moving to Spark, we decide to keep this algorithm, that’s why we call this feature as “By layer Spark Cubing”.
分层构建Cube可以将一个大任务分成若干步,而且每一步都可以基于上一步的输出进行计算,
Figure 3 is the DAG of Cubing in Spark, it illustrates the process in detail: In “Stage 5”, Kylin uses a HiveContext to read the intermediate Hive table, and then do a “map” operation, which is an one to one map, to encode the origin values into K-V bytes. On complete Kylin gets an intermediate encoded RDD. In “Stage 6”, the intermediate RDD is aggregated with a “reduceByKey” operation to get RDD-1, which is the base cuboid. Nextly, do an “flatMap” (one to many map) on RDD-1, because the base cuboid has N children cuboids. And so on, all levels’ RDDs get calculated. These RDDs will be persisted to distributed file system on complete, but be cached in memory for next level’s calculation. When child be generated, it will be removed from cache.
As we know, RDD (Resilient Distributed Dataset) is a basic concept in Spark. A collection of N-Dimension cuboids can be well described as an RDD, a N-Dimension Cube will have N+1 RDD. These RDDs have the parent/child relationship as the parent can be used to generate the children. With the parent RDD cached in memory, the child RDD’s generation can be much efficient than reading from disk. Figure 2 describes this process.