天天看点

如何通过 Json Schema Validation 保障数据质量——附代码示例

为什么要使用 Json Schema Validation

在大数据时代,数据的数量和复杂性不断增加,数据质量的管理变得尤为重要。数据质量的好坏直接影响到决策的准确性和效果。对数据质量的管控,一般可以通过以下几个方面进行:

  1. 确定数据质量目标。根据业务需求和数据使用场景,确定数据质量目标,包括数据准确性、完整性、一致性、可靠性、及时性和安全性等方面。
  2. 定义数据质量指标。根据数据质量目标,定义数据质量指标,例如数据错误率、数据重复率、数据缺失率、数据一致性等,以便对数据质量进行量化评估。
  3. 实施数据质量监控。通过数据质量监控工具和技术手段,对数据进行实时监控和分析,及时发现数据质量问题,并进行处理。
  4. 实施数据质量评估。通过数据质量评估工具和技术手段,对数据进行定期评估,评估数据质量是否符合预期目标,并及时发现数据质量问题,进行改进和优化。
  5. 建立数据质量治理体系。建立数据质量治理体系,包括数据质量管理流程、数据质量管理规范、数据质量管理组织和数据质量管理工具等,以确保数据质量的持续改进和监管。

本文围绕 实施数据质量监控 这个方面,在实际操作层面,介绍如何通过 Json Schema Validation 来保障数据质量。在文章的第 3 部分和第 4 部分,分别介绍 Java 和 Flink 的代码示例,方便读者理解和使用。

如何通过 Json Schema Validation 保障数据质量——附代码示例

什么是 Json Schema Validation

原始日志的格式通常使用 json 格式进行上报和接收的,方便扩展、使用灵活。日志上报和接收过程中,难免会出现日志格式错误、字段类型错误、字段缺失等问题。如果有一种方法,能够识别出日志的问题,自动发现错误并且告警通知到相关同学,能够降低错误日志的处理成本,提升效率。Json Schema Validation 能够完成这项工作。

在介绍 Json Schema Validation 之前,你可能已经听过 Json Validation 。 那么,Json Schema Validation 和 Json Validation 有什么区别呢?

JSON Validation 适用于简单的数据验证场景,主要验证 JSON 数据的语法是否正确,如{} [] 语法是否匹配、值是否是有效的类型(字符串、数字等)。

JSON Schema Validation 更适用于复杂的数据验证场景,对 JSON schema 中定义的规则进行验证。例如验证数据结构、关系和约束等。可以在数据处理、分析、挖掘等环节中使用,以确保数据的正确性和一致性。

相比 JSON Validation,JSON Schema Validation 提供了更加精细、更加个性化的数据验证功能。

如何通过 Json Schema Validation 保障数据质量——附代码示例

如何使用 Java 进行 Json Schema Validation

1.日志验证效果介绍

在正式介绍代码之前,先看下面 2 张图,分别是对正确的日志和错误的日志,进行 JSON Schema Validation 的结果。第 1 张图是处理正确的日志,程序运行之后,没有报错;第 2 张图是处理错误的日志,程序运行之后,不但报错了,还提示:“日志中少了 id 字段”。

如何通过 Json Schema Validation 保障数据质量——附代码示例

通过校验的日志

如何通过 Json Schema Validation 保障数据质量——附代码示例

通不过校验的日志

2.Java 代码介绍

步骤一:引入 everit-json-schema maven 依赖。

<dependency>
    <groupId>com.github.erosb</groupId>
    <artifactId>everit-json-schema</artifactId>
    <version>1.14.2</version>
</dependency>           

步骤二:定义 json schema 。通过定义 json schema,能够定义 json 日志的以下规则:

  • json 日志中包含哪些字段
  • 字段的层级结构是怎样的
  • 每个字段的类型什么
  • 字段的最大最小值是什么
  • 哪些字段是必选的,哪些字段是可选的

json schema 本身也是一个 json,如下所示:

{
  "type": "object",
  "properties": {
    "data": {
      "type": "object",
      "properties": {
        "id": {
          "type": "integer",
          "minimum": 0
        },
        "key": {
          "type": "string"
        }
      },
      "required": [
        "id",
        "key"
      ]
    }
  },
  "required": [
    "data"
  ]
}           

步骤三:引入 everit-json-schema 库,定义了 json shcema 之后,就可以编写 Java 代码了。

import lombok.extern.slf4j.Slf4j;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;

import javax.swing.text.StringContent;

@Slf4j
public class JsonValidationDemo {
    public static void main(String[] args) {
        new JsonValidationDemo().process();
    }

    private void process() {
        String jsonSchema = "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\",\"minimum\":0},\"key\":{\"type\":\"string\"}},\"required\":[\"id\",\"key\"]}},\"required\":[\"data\"]}";
        String jsonContentBad = "{\"hello\":\"world\",\"data\":{\"key\":\"value\"}}";
        String jsonContentGood = "{\"hello\":\"world\",\"data\":{\"id\":1,\"key\":\"value\"}}";
        String stringContent = jsonContentGood;

        JSONObject jsonSchemaObject = new JSONObject(jsonSchema);

        SchemaLoader loader = SchemaLoader.builder().schemaJson(jsonSchemaObject).draftV7Support().build();
        Schema schema = loader.load().build();
        try {
            schema.validate(new JSONObject(stringContent));
        } catch (ValidationException ex) {
            log.error(ex.getErrorMessage());
            ex.printStackTrace();
        }
    }
}           

代码简单说明:

  • jsonSchema:json schema 的定义。
  • jsonContentBad:错误的 json 日志,缺少了 id 字段,不能通过校验。
  • jsonContentGood:正确的 json 日志,能够通过校验。

一旦检测到 json 日志中有错误,该程序不仅会报错,而且还能提示具体报错的原因。在本示例中,报错日志中提示“required key [id] not found”,非常明确。

如何通过 Json Schema Validation 保障数据质量——附代码示例

如何使用 Flink 进行 Json Schema Validation

上面的部分介绍了 Java 程序中,如何进行 Json Schema Validation,适合服务端开发的同学。如果是数据开发的同学,如何进行数据验证呢?考虑到:错误日志越早发现和处理,处理成本越小,下面给出一个 Flink 消费 Kafka 日志,进行单条日志数据校验的代码。

// 导入相关类
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
importjava.util.Properties;

// 定义 JSON schema
String jsonSchema = "{"
  + "\"type\": \"object\","
  + "\"properties\": {"
  + "  \"id\": {\"type\": \"integer\"},"
  + "  \"name\": {\"type\": \"string\"},"
  + "  \"age\": {\"type\": \"integer\"}"
  + "},"
  + "\"required\": [\"id\", \"name\", \"age\"]"
  + "}";

// 创建 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建 Kafka 数据源
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(), props);

// 从 Kafka 中读取数据流
DataStream<String> stream = env.addSource(consumer).map(record -> record.value().toString());

// 使用 JSON Schema Validator 对数据进行验证
stream
  .map(jsonString -> {
    // 将 JSON 字符串解析为 JSON 对象
    Object json = new JSONObject(jsonString);
    return json;
  })
  .filter(new IterativeCondition<Object>() {
    @Override
    public boolean filter(Object json, Context<Object> context) throws Exception {
      // 使用 JSON Schema Validator 对 JSON 对象进行验证
      return JsonSchemaValidator.isValid(jsonSchema, json.toString());
    }
  })
  .map(json -> {
    // 对验证通过的数据进行处理
    // ...
    return json;
  });

// 执行 Flink 程序
env.execute("JSON Schema Validator Example");           

进行日志校验的关键代码是:JsonSchemaValidator.isValid(jsonSchema, json.toString());

原理跟 Java 代码的原理基本相同,不再赘述。要注意的是:代码中使用了 Flink CEP 库,需要先引入进来。

总结

本文从数据质量监控出发,介绍了 Json Schema Validation 的必要性,通过与 Json Validation 的对比,介绍了 Json Schema Validation 提供了一种更加精细、更加个性化的数据验证功能。在文章的后半部分,通过 Java 和 Flink 代码示例,介绍了 Json Schema Validation 的具体使用方法。

总之,使用 Json Schema Validation 进行数据验证可以提高数据的准确性和一致性,加强数据安全性,降低数据处理成本,适用于各种数据处理场景,可以提高数据处理的效率和准确性。

相关文章导读

数据应用相关

  • 用户画像-如何构建用户画像系统
  • 数据仓库 - 7种缓慢变化维的具体处理方法

数据架构相关

  • 云上流批一体架构设计与落地

指标体系相关

  • 搭建指标体系的实施与落地过程
  • 如何搭建指标体系?OSM+ARGO+金字塔原理
  • 基于开源可视化数据探索平台 Superset 的指标体系建

数据治理相关

  • 数据治理-如何实施数据治理
  • 数据管理成熟度模型
  • 数据质量成熟度评分卡

开源技术相关

  • 手把手教你源码安装 Dolphin Scheduler 作业调度系统
  • Dolphin Scheduler:从 Shell 工作流说到代码解析任务间的依赖关系
  • 手把手教你源码安装 Data Ease 开源数据可视化分析工具

继续阅读