为什么要使用 Json Schema Validation
在大数据时代,数据的数量和复杂性不断增加,数据质量的管理变得尤为重要。数据质量的好坏直接影响到决策的准确性和效果。对数据质量的管控,一般可以通过以下几个方面进行:
- 确定数据质量目标。根据业务需求和数据使用场景,确定数据质量目标,包括数据准确性、完整性、一致性、可靠性、及时性和安全性等方面。
- 定义数据质量指标。根据数据质量目标,定义数据质量指标,例如数据错误率、数据重复率、数据缺失率、数据一致性等,以便对数据质量进行量化评估。
- 实施数据质量监控。通过数据质量监控工具和技术手段,对数据进行实时监控和分析,及时发现数据质量问题,并进行处理。
- 实施数据质量评估。通过数据质量评估工具和技术手段,对数据进行定期评估,评估数据质量是否符合预期目标,并及时发现数据质量问题,进行改进和优化。
- 建立数据质量治理体系。建立数据质量治理体系,包括数据质量管理流程、数据质量管理规范、数据质量管理组织和数据质量管理工具等,以确保数据质量的持续改进和监管。
本文围绕 实施数据质量监控 这个方面,在实际操作层面,介绍如何通过 Json Schema Validation 来保障数据质量。在文章的第 3 部分和第 4 部分,分别介绍 Java 和 Flink 的代码示例,方便读者理解和使用。
什么是 Json Schema Validation
原始日志的格式通常使用 json 格式进行上报和接收的,方便扩展、使用灵活。日志上报和接收过程中,难免会出现日志格式错误、字段类型错误、字段缺失等问题。如果有一种方法,能够识别出日志的问题,自动发现错误并且告警通知到相关同学,能够降低错误日志的处理成本,提升效率。Json Schema Validation 能够完成这项工作。
在介绍 Json Schema Validation 之前,你可能已经听过 Json Validation 。 那么,Json Schema Validation 和 Json Validation 有什么区别呢?
JSON Validation 适用于简单的数据验证场景,主要验证 JSON 数据的语法是否正确,如{} [] 语法是否匹配、值是否是有效的类型(字符串、数字等)。
JSON Schema Validation 更适用于复杂的数据验证场景,对 JSON schema 中定义的规则进行验证。例如验证数据结构、关系和约束等。可以在数据处理、分析、挖掘等环节中使用,以确保数据的正确性和一致性。
相比 JSON Validation,JSON Schema Validation 提供了更加精细、更加个性化的数据验证功能。
如何使用 Java 进行 Json Schema Validation
1.日志验证效果介绍
在正式介绍代码之前,先看下面 2 张图,分别是对正确的日志和错误的日志,进行 JSON Schema Validation 的结果。第 1 张图是处理正确的日志,程序运行之后,没有报错;第 2 张图是处理错误的日志,程序运行之后,不但报错了,还提示:“日志中少了 id 字段”。
通过校验的日志
通不过校验的日志
2.Java 代码介绍
步骤一:引入 everit-json-schema maven 依赖。
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<version>1.14.2</version>
</dependency>
步骤二:定义 json schema 。通过定义 json schema,能够定义 json 日志的以下规则:
- json 日志中包含哪些字段
- 字段的层级结构是怎样的
- 每个字段的类型什么
- 字段的最大最小值是什么
- 哪些字段是必选的,哪些字段是可选的
json schema 本身也是一个 json,如下所示:
{
"type": "object",
"properties": {
"data": {
"type": "object",
"properties": {
"id": {
"type": "integer",
"minimum": 0
},
"key": {
"type": "string"
}
},
"required": [
"id",
"key"
]
}
},
"required": [
"data"
]
}
步骤三:引入 everit-json-schema 库,定义了 json shcema 之后,就可以编写 Java 代码了。
import lombok.extern.slf4j.Slf4j;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;
import javax.swing.text.StringContent;
@Slf4j
public class JsonValidationDemo {
public static void main(String[] args) {
new JsonValidationDemo().process();
}
private void process() {
String jsonSchema = "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\",\"minimum\":0},\"key\":{\"type\":\"string\"}},\"required\":[\"id\",\"key\"]}},\"required\":[\"data\"]}";
String jsonContentBad = "{\"hello\":\"world\",\"data\":{\"key\":\"value\"}}";
String jsonContentGood = "{\"hello\":\"world\",\"data\":{\"id\":1,\"key\":\"value\"}}";
String stringContent = jsonContentGood;
JSONObject jsonSchemaObject = new JSONObject(jsonSchema);
SchemaLoader loader = SchemaLoader.builder().schemaJson(jsonSchemaObject).draftV7Support().build();
Schema schema = loader.load().build();
try {
schema.validate(new JSONObject(stringContent));
} catch (ValidationException ex) {
log.error(ex.getErrorMessage());
ex.printStackTrace();
}
}
}
代码简单说明:
- jsonSchema:json schema 的定义。
- jsonContentBad:错误的 json 日志,缺少了 id 字段,不能通过校验。
- jsonContentGood:正确的 json 日志,能够通过校验。
一旦检测到 json 日志中有错误,该程序不仅会报错,而且还能提示具体报错的原因。在本示例中,报错日志中提示“required key [id] not found”,非常明确。
如何使用 Flink 进行 Json Schema Validation
上面的部分介绍了 Java 程序中,如何进行 Json Schema Validation,适合服务端开发的同学。如果是数据开发的同学,如何进行数据验证呢?考虑到:错误日志越早发现和处理,处理成本越小,下面给出一个 Flink 消费 Kafka 日志,进行单条日志数据校验的代码。
// 导入相关类
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
importjava.util.Properties;
// 定义 JSON schema
String jsonSchema = "{"
+ "\"type\": \"object\","
+ "\"properties\": {"
+ " \"id\": {\"type\": \"integer\"},"
+ " \"name\": {\"type\": \"string\"},"
+ " \"age\": {\"type\": \"integer\"}"
+ "},"
+ "\"required\": [\"id\", \"name\", \"age\"]"
+ "}";
// 创建 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Kafka 数据源
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(), props);
// 从 Kafka 中读取数据流
DataStream<String> stream = env.addSource(consumer).map(record -> record.value().toString());
// 使用 JSON Schema Validator 对数据进行验证
stream
.map(jsonString -> {
// 将 JSON 字符串解析为 JSON 对象
Object json = new JSONObject(jsonString);
return json;
})
.filter(new IterativeCondition<Object>() {
@Override
public boolean filter(Object json, Context<Object> context) throws Exception {
// 使用 JSON Schema Validator 对 JSON 对象进行验证
return JsonSchemaValidator.isValid(jsonSchema, json.toString());
}
})
.map(json -> {
// 对验证通过的数据进行处理
// ...
return json;
});
// 执行 Flink 程序
env.execute("JSON Schema Validator Example");
进行日志校验的关键代码是:JsonSchemaValidator.isValid(jsonSchema, json.toString());
原理跟 Java 代码的原理基本相同,不再赘述。要注意的是:代码中使用了 Flink CEP 库,需要先引入进来。
总结
本文从数据质量监控出发,介绍了 Json Schema Validation 的必要性,通过与 Json Validation 的对比,介绍了 Json Schema Validation 提供了一种更加精细、更加个性化的数据验证功能。在文章的后半部分,通过 Java 和 Flink 代码示例,介绍了 Json Schema Validation 的具体使用方法。
总之,使用 Json Schema Validation 进行数据验证可以提高数据的准确性和一致性,加强数据安全性,降低数据处理成本,适用于各种数据处理场景,可以提高数据处理的效率和准确性。
相关文章导读
数据应用相关
- 用户画像-如何构建用户画像系统
- 数据仓库 - 7种缓慢变化维的具体处理方法
数据架构相关
- 云上流批一体架构设计与落地
指标体系相关
- 搭建指标体系的实施与落地过程
- 如何搭建指标体系?OSM+ARGO+金字塔原理
- 基于开源可视化数据探索平台 Superset 的指标体系建
数据治理相关
- 数据治理-如何实施数据治理
- 数据管理成熟度模型
- 数据质量成熟度评分卡
开源技术相关
- 手把手教你源码安装 Dolphin Scheduler 作业调度系统
- Dolphin Scheduler:从 Shell 工作流说到代码解析任务间的依赖关系
- 手把手教你源码安装 Data Ease 开源数据可视化分析工具