天天看點

如何通過 Json Schema Validation 保障資料品質——附代碼示例

作者:資料築夢空間

為什麼要使用 Json Schema Validation

在大資料時代,資料的數量和複雜性不斷增加,資料品質的管理變得尤為重要。資料品質的好壞直接影響到決策的準确性和效果。對資料品質的管控,一般可以通過以下幾個方面進行:

  1. 确定資料品質目标。根據業務需求和資料使用場景,确定資料品質目标,包括資料準确性、完整性、一緻性、可靠性、及時性和安全性等方面。
  2. 定義資料品質名額。根據資料品質目标,定義資料品質名額,例如資料錯誤率、資料重複率、資料缺失率、資料一緻性等,以便對資料品質進行量化評估。
  3. 實施資料品質監控。通過資料品質監控工具和技術手段,對資料進行實時監控和分析,及時發現資料品質問題,并進行處理。
  4. 實施資料品質評估。通過資料品質評估工具和技術手段,對資料進行定期評估,評估資料品質是否符合預期目标,并及時發現資料品質問題,進行改進和優化。
  5. 建立資料品質治理體系。建立資料品質治理體系,包括資料品質管理流程、資料品質管理規範、資料品質管理組織和資料品質管理工具等,以確定資料品質的持續改進和監管。

本文圍繞 實施資料品質監控 這個方面,在實際操作層面,介紹如何通過 Json Schema Validation 來保障資料品質。在文章的第 3 部分和第 4 部分,分别介紹 Java 和 Flink 的代碼示例,友善讀者了解和使用。

如何通過 Json Schema Validation 保障資料品質——附代碼示例

什麼是 Json Schema Validation

原始日志的格式通常使用 json 格式進行上報和接收的,友善擴充、使用靈活。日志上報和接收過程中,難免會出現日志格式錯誤、字段類型錯誤、字段缺失等問題。如果有一種方法,能夠識别出日志的問題,自動發現錯誤并且告警通知到相關同學,能夠降低錯誤日志的處理成本,提升效率。Json Schema Validation 能夠完成這項工作。

在介紹 Json Schema Validation 之前,你可能已經聽過 Json Validation 。 那麼,Json Schema Validation 和 Json Validation 有什麼差別呢?

JSON Validation 适用于簡單的資料驗證場景,主要驗證 JSON 資料的文法是否正确,如{} [] 文法是否比對、值是否是有效的類型(字元串、數字等)。

JSON Schema Validation 更适用于複雜的資料驗證場景,對 JSON schema 中定義的規則進行驗證。例如驗證資料結構、關系和限制等。可以在資料處理、分析、挖掘等環節中使用,以確定資料的正确性和一緻性。

相比 JSON Validation,JSON Schema Validation 提供了更加精細、更加個性化的資料驗證功能。

如何通過 Json Schema Validation 保障資料品質——附代碼示例

如何使用 Java 進行 Json Schema Validation

1.日志驗證效果介紹

在正式介紹代碼之前,先看下面 2 張圖,分别是對正确的日志和錯誤的日志,進行 JSON Schema Validation 的結果。第 1 張圖是處理正确的日志,程式運作之後,沒有報錯;第 2 張圖是處理錯誤的日志,程式運作之後,不但報錯了,還提示:“日志中少了 id 字段”。

如何通過 Json Schema Validation 保障資料品質——附代碼示例

通過校驗的日志

如何通過 Json Schema Validation 保障資料品質——附代碼示例

通不過校驗的日志

2.Java 代碼介紹

步驟一:引入 everit-json-schema maven 依賴。

<dependency>
    <groupId>com.github.erosb</groupId>
    <artifactId>everit-json-schema</artifactId>
    <version>1.14.2</version>
</dependency>           

步驟二:定義 json schema 。通過定義 json schema,能夠定義 json 日志的以下規則:

  • json 日志中包含哪些字段
  • 字段的層級結構是怎樣的
  • 每個字段的類型什麼
  • 字段的最大最小值是什麼
  • 哪些字段是必選的,哪些字段是可選的

json schema 本身也是一個 json,如下所示:

{
  "type": "object",
  "properties": {
    "data": {
      "type": "object",
      "properties": {
        "id": {
          "type": "integer",
          "minimum": 0
        },
        "key": {
          "type": "string"
        }
      },
      "required": [
        "id",
        "key"
      ]
    }
  },
  "required": [
    "data"
  ]
}           

步驟三:引入 everit-json-schema 庫,定義了 json shcema 之後,就可以編寫 Java 代碼了。

import lombok.extern.slf4j.Slf4j;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;

import javax.swing.text.StringContent;

@Slf4j
public class JsonValidationDemo {
    public static void main(String[] args) {
        new JsonValidationDemo().process();
    }

    private void process() {
        String jsonSchema = "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\",\"minimum\":0},\"key\":{\"type\":\"string\"}},\"required\":[\"id\",\"key\"]}},\"required\":[\"data\"]}";
        String jsonContentBad = "{\"hello\":\"world\",\"data\":{\"key\":\"value\"}}";
        String jsonContentGood = "{\"hello\":\"world\",\"data\":{\"id\":1,\"key\":\"value\"}}";
        String stringContent = jsonContentGood;

        JSONObject jsonSchemaObject = new JSONObject(jsonSchema);

        SchemaLoader loader = SchemaLoader.builder().schemaJson(jsonSchemaObject).draftV7Support().build();
        Schema schema = loader.load().build();
        try {
            schema.validate(new JSONObject(stringContent));
        } catch (ValidationException ex) {
            log.error(ex.getErrorMessage());
            ex.printStackTrace();
        }
    }
}           

代碼簡單說明:

  • jsonSchema:json schema 的定義。
  • jsonContentBad:錯誤的 json 日志,缺少了 id 字段,不能通過校驗。
  • jsonContentGood:正确的 json 日志,能夠通過校驗。

一旦檢測到 json 日志中有錯誤,該程式不僅會報錯,而且還能提示具體報錯的原因。在本示例中,報錯日志中提示“required key [id] not found”,非常明确。

如何通過 Json Schema Validation 保障資料品質——附代碼示例

如何使用 Flink 進行 Json Schema Validation

上面的部分介紹了 Java 程式中,如何進行 Json Schema Validation,适合服務端開發的同學。如果是資料開發的同學,如何進行資料驗證呢?考慮到:錯誤日志越早發現和處理,處理成本越小,下面給出一個 Flink 消費 Kafka 日志,進行單條日志資料校驗的代碼。

// 導入相關類
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
importjava.util.Properties;

// 定義 JSON schema
String jsonSchema = "{"
  + "\"type\": \"object\","
  + "\"properties\": {"
  + "  \"id\": {\"type\": \"integer\"},"
  + "  \"name\": {\"type\": \"string\"},"
  + "  \"age\": {\"type\": \"integer\"}"
  + "},"
  + "\"required\": [\"id\", \"name\", \"age\"]"
  + "}";

// 建立 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 建立 Kafka 資料源
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(), props);

// 從 Kafka 中讀取資料流
DataStream<String> stream = env.addSource(consumer).map(record -> record.value().toString());

// 使用 JSON Schema Validator 對資料進行驗證
stream
  .map(jsonString -> {
    // 将 JSON 字元串解析為 JSON 對象
    Object json = new JSONObject(jsonString);
    return json;
  })
  .filter(new IterativeCondition<Object>() {
    @Override
    public boolean filter(Object json, Context<Object> context) throws Exception {
      // 使用 JSON Schema Validator 對 JSON 對象進行驗證
      return JsonSchemaValidator.isValid(jsonSchema, json.toString());
    }
  })
  .map(json -> {
    // 對驗證通過的資料進行處理
    // ...
    return json;
  });

// 執行 Flink 程式
env.execute("JSON Schema Validator Example");           

進行日志校驗的關鍵代碼是:JsonSchemaValidator.isValid(jsonSchema, json.toString());

原理跟 Java 代碼的原理基本相同,不再贅述。要注意的是:代碼中使用了 Flink CEP 庫,需要先引入進來。

總結

本文從資料品質監控出發,介紹了 Json Schema Validation 的必要性,通過與 Json Validation 的對比,介紹了 Json Schema Validation 提供了一種更加精細、更加個性化的資料驗證功能。在文章的後半部分,通過 Java 和 Flink 代碼示例,介紹了 Json Schema Validation 的具體使用方法。

總之,使用 Json Schema Validation 進行資料驗證可以提高資料的準确性和一緻性,加強資料安全性,降低資料處理成本,适用于各種資料處理場景,可以提高資料處理的效率和準确性。

相關文章導讀

資料應用相關

  • 使用者畫像-如何建構使用者畫像系統
  • 資料倉庫 - 7種緩慢變化維的具體處理方法

資料架構相關

  • 雲上流批一體架構設計與落地

名額體系相關

  • 搭建名額體系的實施與落地過程
  • 如何搭建名額體系?OSM+ARGO+金字塔原理
  • 基于開源可視化資料探索平台 Superset 的名額體系建

資料治理相關

  • 資料治理-如何實施資料治理
  • 資料管理成熟度模型
  • 資料品質成熟度評分卡

開源技術相關

  • 手把手教你源碼安裝 Dolphin Scheduler 作業排程系統
  • Dolphin Scheduler:從 Shell 工作流說到代碼解析任務間的依賴關系
  • 手把手教你源碼安裝 Data Ease 開源資料可視化分析工具

繼續閱讀