為什麼要使用 Json Schema Validation
在大資料時代,資料的數量和複雜性不斷增加,資料品質的管理變得尤為重要。資料品質的好壞直接影響到決策的準确性和效果。對資料品質的管控,一般可以通過以下幾個方面進行:
- 确定資料品質目标。根據業務需求和資料使用場景,确定資料品質目标,包括資料準确性、完整性、一緻性、可靠性、及時性和安全性等方面。
- 定義資料品質名額。根據資料品質目标,定義資料品質名額,例如資料錯誤率、資料重複率、資料缺失率、資料一緻性等,以便對資料品質進行量化評估。
- 實施資料品質監控。通過資料品質監控工具和技術手段,對資料進行實時監控和分析,及時發現資料品質問題,并進行處理。
- 實施資料品質評估。通過資料品質評估工具和技術手段,對資料進行定期評估,評估資料品質是否符合預期目标,并及時發現資料品質問題,進行改進和優化。
- 建立資料品質治理體系。建立資料品質治理體系,包括資料品質管理流程、資料品質管理規範、資料品質管理組織和資料品質管理工具等,以確定資料品質的持續改進和監管。
本文圍繞 實施資料品質監控 這個方面,在實際操作層面,介紹如何通過 Json Schema Validation 來保障資料品質。在文章的第 3 部分和第 4 部分,分别介紹 Java 和 Flink 的代碼示例,友善讀者了解和使用。
什麼是 Json Schema Validation
原始日志的格式通常使用 json 格式進行上報和接收的,友善擴充、使用靈活。日志上報和接收過程中,難免會出現日志格式錯誤、字段類型錯誤、字段缺失等問題。如果有一種方法,能夠識别出日志的問題,自動發現錯誤并且告警通知到相關同學,能夠降低錯誤日志的處理成本,提升效率。Json Schema Validation 能夠完成這項工作。
在介紹 Json Schema Validation 之前,你可能已經聽過 Json Validation 。 那麼,Json Schema Validation 和 Json Validation 有什麼差別呢?
JSON Validation 适用于簡單的資料驗證場景,主要驗證 JSON 資料的文法是否正确,如{} [] 文法是否比對、值是否是有效的類型(字元串、數字等)。
JSON Schema Validation 更适用于複雜的資料驗證場景,對 JSON schema 中定義的規則進行驗證。例如驗證資料結構、關系和限制等。可以在資料處理、分析、挖掘等環節中使用,以確定資料的正确性和一緻性。
相比 JSON Validation,JSON Schema Validation 提供了更加精細、更加個性化的資料驗證功能。
如何使用 Java 進行 Json Schema Validation
1.日志驗證效果介紹
在正式介紹代碼之前,先看下面 2 張圖,分别是對正确的日志和錯誤的日志,進行 JSON Schema Validation 的結果。第 1 張圖是處理正确的日志,程式運作之後,沒有報錯;第 2 張圖是處理錯誤的日志,程式運作之後,不但報錯了,還提示:“日志中少了 id 字段”。
通過校驗的日志
通不過校驗的日志
2.Java 代碼介紹
步驟一:引入 everit-json-schema maven 依賴。
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<version>1.14.2</version>
</dependency>
步驟二:定義 json schema 。通過定義 json schema,能夠定義 json 日志的以下規則:
- json 日志中包含哪些字段
- 字段的層級結構是怎樣的
- 每個字段的類型什麼
- 字段的最大最小值是什麼
- 哪些字段是必選的,哪些字段是可選的
json schema 本身也是一個 json,如下所示:
{
"type": "object",
"properties": {
"data": {
"type": "object",
"properties": {
"id": {
"type": "integer",
"minimum": 0
},
"key": {
"type": "string"
}
},
"required": [
"id",
"key"
]
}
},
"required": [
"data"
]
}
步驟三:引入 everit-json-schema 庫,定義了 json shcema 之後,就可以編寫 Java 代碼了。
import lombok.extern.slf4j.Slf4j;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;
import javax.swing.text.StringContent;
@Slf4j
public class JsonValidationDemo {
public static void main(String[] args) {
new JsonValidationDemo().process();
}
private void process() {
String jsonSchema = "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\",\"minimum\":0},\"key\":{\"type\":\"string\"}},\"required\":[\"id\",\"key\"]}},\"required\":[\"data\"]}";
String jsonContentBad = "{\"hello\":\"world\",\"data\":{\"key\":\"value\"}}";
String jsonContentGood = "{\"hello\":\"world\",\"data\":{\"id\":1,\"key\":\"value\"}}";
String stringContent = jsonContentGood;
JSONObject jsonSchemaObject = new JSONObject(jsonSchema);
SchemaLoader loader = SchemaLoader.builder().schemaJson(jsonSchemaObject).draftV7Support().build();
Schema schema = loader.load().build();
try {
schema.validate(new JSONObject(stringContent));
} catch (ValidationException ex) {
log.error(ex.getErrorMessage());
ex.printStackTrace();
}
}
}
代碼簡單說明:
- jsonSchema:json schema 的定義。
- jsonContentBad:錯誤的 json 日志,缺少了 id 字段,不能通過校驗。
- jsonContentGood:正确的 json 日志,能夠通過校驗。
一旦檢測到 json 日志中有錯誤,該程式不僅會報錯,而且還能提示具體報錯的原因。在本示例中,報錯日志中提示“required key [id] not found”,非常明确。
如何使用 Flink 進行 Json Schema Validation
上面的部分介紹了 Java 程式中,如何進行 Json Schema Validation,适合服務端開發的同學。如果是資料開發的同學,如何進行資料驗證呢?考慮到:錯誤日志越早發現和處理,處理成本越小,下面給出一個 Flink 消費 Kafka 日志,進行單條日志資料校驗的代碼。
// 導入相關類
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
importjava.util.Properties;
// 定義 JSON schema
String jsonSchema = "{"
+ "\"type\": \"object\","
+ "\"properties\": {"
+ " \"id\": {\"type\": \"integer\"},"
+ " \"name\": {\"type\": \"string\"},"
+ " \"age\": {\"type\": \"integer\"}"
+ "},"
+ "\"required\": [\"id\", \"name\", \"age\"]"
+ "}";
// 建立 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 建立 Kafka 資料源
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(), props);
// 從 Kafka 中讀取資料流
DataStream<String> stream = env.addSource(consumer).map(record -> record.value().toString());
// 使用 JSON Schema Validator 對資料進行驗證
stream
.map(jsonString -> {
// 将 JSON 字元串解析為 JSON 對象
Object json = new JSONObject(jsonString);
return json;
})
.filter(new IterativeCondition<Object>() {
@Override
public boolean filter(Object json, Context<Object> context) throws Exception {
// 使用 JSON Schema Validator 對 JSON 對象進行驗證
return JsonSchemaValidator.isValid(jsonSchema, json.toString());
}
})
.map(json -> {
// 對驗證通過的資料進行處理
// ...
return json;
});
// 執行 Flink 程式
env.execute("JSON Schema Validator Example");
進行日志校驗的關鍵代碼是:JsonSchemaValidator.isValid(jsonSchema, json.toString());
原理跟 Java 代碼的原理基本相同,不再贅述。要注意的是:代碼中使用了 Flink CEP 庫,需要先引入進來。
總結
本文從資料品質監控出發,介紹了 Json Schema Validation 的必要性,通過與 Json Validation 的對比,介紹了 Json Schema Validation 提供了一種更加精細、更加個性化的資料驗證功能。在文章的後半部分,通過 Java 和 Flink 代碼示例,介紹了 Json Schema Validation 的具體使用方法。
總之,使用 Json Schema Validation 進行資料驗證可以提高資料的準确性和一緻性,加強資料安全性,降低資料處理成本,适用于各種資料處理場景,可以提高資料處理的效率和準确性。
相關文章導讀
資料應用相關
- 使用者畫像-如何建構使用者畫像系統
- 資料倉庫 - 7種緩慢變化維的具體處理方法
資料架構相關
- 雲上流批一體架構設計與落地
名額體系相關
- 搭建名額體系的實施與落地過程
- 如何搭建名額體系?OSM+ARGO+金字塔原理
- 基于開源可視化資料探索平台 Superset 的名額體系建
資料治理相關
- 資料治理-如何實施資料治理
- 資料管理成熟度模型
- 資料品質成熟度評分卡
開源技術相關
- 手把手教你源碼安裝 Dolphin Scheduler 作業排程系統
- Dolphin Scheduler:從 Shell 工作流說到代碼解析任務間的依賴關系
- 手把手教你源碼安裝 Data Ease 開源資料可視化分析工具