Pekey‘s Blog

Kylin Cube构建过程学习

2018/04/20 Share

参考文献:https://blog.bcmeng.com


MapReduce 计算引擎 批量计算Cube,其输入是Hive表,输出是HBase的KeyValue,整个构建过程主要包含以下6步:

  1. 建立Hive的大宽表; (MapReduce计算)
  2. 对需要字典编码的列计算列基数; (MapReduce计算)
  3. 构建字典; (JobServer计算 or MapReduce计算)
  4. 分层构建Cuboid; (MapReduce计算)
  5. 将Cuboid转为HBase的KeyValue结构(HFile); (MapReduce计算)
  6. 元数据更新和垃圾回收。

    Cube Build流程

CubeController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);

if (cube == null) {
throw new InternalErrorException("Cannot find cube " + cubeName);
}
return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
CubeBuildTypeEnum.valueOf(buildType), force, submitter);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
}
}

JobService

1
2
3
4
5
6
ISource source = SourceFactory.getSource(cube);
SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd);
src = source.enrichSourcePartitionBeforeBuild(cube, src);
newSeg = getCubeManager().appendSegment(cube, src);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);

EngineFactory

1
2
3
4
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
}

MRBatchCubingEngine2

1
2
3
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return new BatchCubingJobBuilder2(newSegment, submitter).build();
}

BatchCubingJobBuilder2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);

final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);

// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);

// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(jobId));

if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
}

result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));
outputSide.addStepPhase2_BuildDictionary(result);

// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
outputSide.addStepPhase3_BuildCube(result);

// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);

return result;
}

Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables

BatchCubingJobBuilder2

1
2
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);

HiveMRInput

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
.getConfig();
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);

// create flat table first
addStepPhase1_DoCreateFlatTable(jobFlow);

// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
}

// special for hive
addStepPhase1_DoMaterializeLookupTable(jobFlow);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
String cubeName) {
//from hive to hive
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);

CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
step.setInitStatement(hiveInitStatements);
step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
return step;
}

CreateFlatHiveTableStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void createFlatHiveTable(KylinConfig config) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
hiveCmdBuilder.addStatement(getInitStatement());
hiveCmdBuilder.addStatementWithRedistributeBy(getCreateTableStatement());
final String cmd = hiveCmdBuilder.toString();

stepLogger.log("Create and distribute table, cmd: ");
stepLogger.log(cmd);

Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
Map<String, String> info = stepLogger.getInfo();

//get the flat Hive table size
Matcher matcher = HDFS_LOCATION.matcher(cmd);
if (matcher.find()) {
String hiveFlatTableHdfsUrl = matcher.group(1);
long size = getFileSize(hiveFlatTableHdfsUrl);
info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
logger.info("HDFS_Bytes_Writen: " + size);
}
getManager().addJobInfo(getId(), info);
if (response.getFirst() != 0) {
throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst());
}
}

Phase 2: Build Dictionary

Phase 3: Build Cube

BatchCubingJobBuilder2

1
2
3
4
// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
outputSide.addStepPhase3_BuildCube(result);

1
2
3
4
5
6
7
8
9
10
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
// Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
// base cuboid step
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
for (int i = 1; i <= maxLevel; i++) {
result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private MapReduceExecutable createBaseCuboidStep(String cuboidOutputPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();

StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd);

baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);

appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);

baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level, String jobId) {
// ND cuboid job
MapReduceExecutable ndCuboidStep = new MapReduceExecutable();

ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : level " + level);
StringBuilder cmd = new StringBuilder();

appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);

ndCuboidStep.setMapReduceParams(cmd.toString());
ndCuboidStep.setMapReduceJobClass(getNDCuboidJob());
return ndCuboidStep;
}

MR Cube Build


从Hive表生成Base Cuboid

在实际的cube构建过程中,会首先根据cube的Hive事实表和维表生成一张大宽表,然后计算大宽表列的基数,建立维度字典,估算cuboid的大小,建立cube对应的HBase表,再计算base cuboid。
计算base cuboid就是一个MapReduce作业,其输入是上面提到的Hive大宽表,输出是的key是各种维度组合,value是Hive大宽表中指标的值。

BaseCuboidJob
mapper: HiveToBaseCuboidMapper

1
2
3
4
5
6
7
8
9
10
11
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);
for (String[] row: rowCollection) {
try {
outputKV(row, context);

} catch (Exception ex) {
handleErrorRecord(row, ex);
}
}
}

HiveTableReader

1
2
3
4
5
6
7
8
public static String[] getRowAsStringArray(HCatRecord record) {
String[] arr = new String[record.size()];
for (int i = 0; i < arr.length; i++) {
Object o = record.get(i);
arr[i] = (o == null) ? null : o.toString();
}
return arr;
}

BaseCuboidMapperBase

1
2
3
4
5
6
7
8
protected void  outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
outputKey.set(rowKey, 0, rowKey.length);

ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(outputKey, outputValue);
}


从Base Cuboid 逐层计算 Cuboid。

从base cuboid 逐层计算每层的cuboid,也是MapReduce作业,map阶段每层维度数依次减少,reduce阶段对指标进行聚合。

reducer: CuboidReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
aggs.reset();

for (Text value : values) {
if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
}
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
aggs.aggregate(input, needAggrMeasures);
}
aggs.collectStates(result);

ByteBuffer valueBuf = codec.encode(result);

outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(key, outputValue);
}

Cuboid 转化为HBase的HFile


Spark Cube Build

The “by-layer” Cubing divides a big task into a couple steps, and each step bases on the previous step’s output, so it can reuse the previous calculation and also avoid calculating from very beginning when there is a failure in between. These makes it as a reliable algorithm. When moving to Spark, we decide to keep this algorithm, that’s why we call this feature as “By layer Spark Cubing”.

分层构建Cube可以将一个大任务分成若干步,而且每一步都可以基于上一步的输出进行计算,

Figure 3 is the DAG of Cubing in Spark, it illustrates the process in detail: In “Stage 5”, Kylin uses a HiveContext to read the intermediate Hive table, and then do a “map” operation, which is an one to one map, to encode the origin values into K-V bytes. On complete Kylin gets an intermediate encoded RDD. In “Stage 6”, the intermediate RDD is aggregated with a “reduceByKey” operation to get RDD-1, which is the base cuboid. Nextly, do an “flatMap” (one to many map) on RDD-1, because the base cuboid has N children cuboids. And so on, all levels’ RDDs get calculated. These RDDs will be persisted to distributed file system on complete, but be cached in memory for next level’s calculation. When child be generated, it will be removed from cache.

As we know, RDD (Resilient Distributed Dataset) is a basic concept in Spark. A collection of N-Dimension cuboids can be well described as an RDD, a N-Dimension Cube will have N+1 RDD. These RDDs have the parent/child relationship as the parent can be used to generate the children. With the parent RDD cached in memory, the child RDD’s generation can be much efficient than reading from disk. Figure 2 describes this process.

Phase 4: Update Metadata & Cleanup

其他代码

CATALOG
  1. 1. Cube Build流程
    1. 1.1. Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
    2. 1.2. Phase 2: Build Dictionary
    3. 1.3. Phase 3: Build Cube
      1. 1.3.1. MR Cube Build
      2. 1.3.2. Spark Cube Build
      3. 1.3.3. Phase 4: Update Metadata & Cleanup
    4. 1.4. 其他代码