天天看点

三. Apache Griffin基于Spark Streaming流数据质量监控实战

数据集

假设我们在不同的 kafka 主题(源、目标)中有两个流数据集,我们需要根据源数据集知道目标数据集的数据质量如何。

为简单起见,假设两个主题的数据都是 json 字符串,如下所示

{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...
           

环境准备

为 Apache Griffin 测量模块准备环境,包括以下组件:

  • JDK (1.8+)
  • Hadoop (2.6.0+)
  • Spark (2.2.1+)
  • Kafka (0.8.x)
  • Zookeeper (3.5+)

有关以上组件的详细的配置过程,可以参考griffin/griffin-doc/deploy,本文假定以上环境均已配置完毕。

有关版本匹配的信息,可参考https://github.com/apache/griffin/blob/master/griffin-doc/deploy/measure-build-guide.md

构建 Apache Griffin 测量模块

1.在此处下载 Apache Griffin 源包。

2.解压源包。

unzip griffin-0.4.0-source-release.zip
cd griffin-0.4.0-source-release
           

3.构建 Apache Griffin jar

mvn clean install
           

并将构建的 apache griffin jar包移动到项目路径中

mv measure/target/measure-0.4.0.jar <work path>/griffin-measure.jar
           

数据准备

为了快速开始,我们利用kafka shell创建两个 kafka 主题(源、目标)并为它们生成 json 字符串格式的数据。

# create topics
# Note: it just works for kafka 0.8
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target
           

数据格式类似于这样:

{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
           

对于主题源和目标,彼此之间可能有一些不同的数据。 可以下载demo数据并执行 ./streaming-data.sh 脚本生成json字符串数据文件,并将它们生成到kafka主题中。

定义数据质量指标

Apache Griffin环境配置

环境配置文件:env.json

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://es:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "zk:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}
           

定义griffin数据质量(DQ)

DQ配置文件:dq.json

{
  "name": "streaming_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/source",
        "info.path": "source",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-5m", "0"],
        "updatable": true
      }
    }, {
      "name": "tgt",
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "target",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/target",
        "info.path": "target",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-1m", "0"]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out":[
          {
            "type":"metric",
            "name": "accu"
          },
          {
            "type":"record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}
           

测量数据质量

将测量作业提交到 Spark,以配置文件路径作为参数。

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
           

报告数据质量指标

在控制台中可以获取计算日志,当作业运行时,可得到每分钟打印的结果指标。 相关的结果也会保存在hdfs中:hdfs:///griffin/persist//,并且在以计算任务的时间戳命名的不同目录中列出。

优化数据质量报告

还可以根据结果,以及实际业务需要,进一步改进数据质量度量

有关度量指标的详细的各项配置参数的含义,可以参考griffin/griffin-doc/measure

继续阅读