数据集
假设我们在不同的 kafka 主题(源、目标)中有两个流数据集,我们需要根据源数据集知道目标数据集的数据质量如何。
为简单起见,假设两个主题的数据都是 json 字符串,如下所示
{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...
环境准备
为 Apache Griffin 测量模块准备环境,包括以下组件:
- JDK (1.8+)
- Hadoop (2.6.0+)
- Spark (2.2.1+)
- Kafka (0.8.x)
- Zookeeper (3.5+)
有关以上组件的详细的配置过程,可以参考griffin/griffin-doc/deploy,本文假定以上环境均已配置完毕。
有关版本匹配的信息,可参考https://github.com/apache/griffin/blob/master/griffin-doc/deploy/measure-build-guide.md
构建 Apache Griffin 测量模块
1.在此处下载 Apache Griffin 源包。
2.解压源包。
unzip griffin-0.4.0-source-release.zip
cd griffin-0.4.0-source-release
3.构建 Apache Griffin jar
mvn clean install
并将构建的 apache griffin jar包移动到项目路径中
mv measure/target/measure-0.4.0.jar <work path>/griffin-measure.jar
数据准备
为了快速开始,我们利用kafka shell创建两个 kafka 主题(源、目标)并为它们生成 json 字符串格式的数据。
# create topics
# Note: it just works for kafka 0.8
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target
数据格式类似于这样:
{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
对于主题源和目标,彼此之间可能有一些不同的数据。 可以下载demo数据并执行 ./streaming-data.sh 脚本生成json字符串数据文件,并将它们生成到kafka主题中。
定义数据质量指标
Apache Griffin环境配置
环境配置文件:env.json
{
"spark": {
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/checkpoint",
"batch.interval": "20s",
"process.interval": "1m",
"init.clear": true,
"config": {
"spark.default.parallelism": 4,
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
"spark.yarn.maxAppAttempts": 5,
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.max.executor.failures": 120,
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
"sinks": [
{
"type": "console"
},
{
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/persist"
}
},
{
"type": "elasticsearch",
"config": {
"method": "post",
"api": "http://es:9200/griffin/accuracy"
}
}
],
"griffin.checkpoint": [
{
"type": "zk",
"config": {
"hosts": "zk:2181",
"namespace": "griffin/infocache",
"lock.path": "lock",
"mode": "persist",
"init.clear": true,
"close.clear": false
}
}
]
}
定义griffin数据质量(DQ)
DQ配置文件:dq.json
{
"name": "streaming_accu",
"process.type": "streaming",
"data.sources": [
{
"name": "src",
"baseline": true,
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "source",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-5m", "0"],
"updatable": true
}
}, {
"name": "tgt",
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "target",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/target",
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-1m", "0"]
}
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "accuracy",
"out.dataframe.name": "accu",
"rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
"details": {
"source": "src",
"target": "tgt",
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
},
"out":[
{
"type":"metric",
"name": "accu"
},
{
"type":"record",
"name": "missRecords"
}
]
}
]
},
"sinks": ["CONSOLE", "HDFS"]
}
测量数据质量
将测量作业提交到 Spark,以配置文件路径作为参数。
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
报告数据质量指标
在控制台中可以获取计算日志,当作业运行时,可得到每分钟打印的结果指标。 相关的结果也会保存在hdfs中:hdfs:///griffin/persist//,并且在以计算任务的时间戳命名的不同目录中列出。
优化数据质量报告
还可以根据结果,以及实际业务需要,进一步改进数据质量度量
有关度量指标的详细的各项配置参数的含义,可以参考griffin/griffin-doc/measure