天天看點

【Kafka】Kafka Schema Registry 原理1.概述Kafka 用戶端使用原理Registry 服務端

【Kafka】Kafka Schema Registry 原理1.概述Kafka 用戶端使用原理Registry 服務端

1.概述

原文:https://www.dazhuanlan.com/2019/12/10/5deec4add22d4/

Confluent 公司為了能讓 Kafka 支援 Avro 序列化,建立了 Kafka Schema Registry 項目,項目位址為 https://github.com/confluentinc/schema-registry 。對于存儲大量資料的 kafka 來說,使用 Avro 序列化,可以減少資料的存儲空間提高了存儲量,減少了序列化時間提高了性能。 Kafka 有多個topic,裡面存儲了不同種類的資料,每種資料都對應着一個 Avro schema 來描述這種格式。Registry 服務支援友善的管理這些 topic 的schema,它還對外提供了多個 restful 接口,用于存儲和查找。

Avro 序列化示例

Avro 序列化相比常見的序列化(比如 json)會更快,序列化的資料會更小。相比 protobuf ,它可以支援實時編譯,不需要像 protobuf 那樣先定義好資料格式檔案,編譯之後才能使用。下面簡單的介紹下 如何使用 Avro 序列化:

資料格式檔案:

{
 "namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
           

序列化生成位元組:

Schema schema = new Schema.Parser().parse(new File("user.avsc"));
// 建立一個執行個體
GenericRecord user = new GenericData.Record(schema);
user.put("name", "Alyssa");
user.put("favorite_number", 256);

// 建構輸出流,儲存結果
ByteArrayOutputStream out = new ByteArrayOutputStream();
// BinaryEncoder負責向輸出流,寫入資料
BinaryEncoder encoder =  EncoderFactory.get().directBinaryEncoder(out, null);
// DatumWriter負責序列化
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
// 調用DatumWriter序列化,并将結果寫入到輸出流
datumWriter.write(user, encoder);
// 重新整理緩存
encoder.flush();

// 擷取序列化的結果
byte[] result = out.toByteArray();
           

更多用法可以參見官方文檔,http://avro.apache.org/docs/current/gettingstartedjava.html

Kafka 用戶端使用原理

Kafka Schema Registry 提供了 KafkaAvroSerializer 和 KafkaAvroDeserializer 兩個類。Kafka 如果要使用 Avro 序列化, 在執行個體化 KafkaProducer 和 KafkaConsumer 時, 指定序列化或反序列化的配置。

用戶端發送資料的流程圖如下所示:

【Kafka】Kafka Schema Registry 原理1.概述Kafka 用戶端使用原理Registry 服務端

我們向 kafka 發送資料時,需要先向 Schema Registry 注冊 schema,然後序列化發送到 kafka 裡。當我們需要從 kafka 消費資料時,也需要先從 Schema Registry 擷取 schema,然後才能解析資料。

下面以執行個體 KafkaProducer 的使用為例

public class  {

    public static void main(String[] args) throws Exception {
        
        String kafkaHost = "xxx.xxx.xxx.xxx:9092";
        String topic = "schema-tutorial";
        String schameFilename = "user.json";
        String registryHost = "http://xxx.xxx.xxx.xxx:8081";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 指定Value的序列化類,KafkaAvroSerializer
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        // 指定 registry 服務的位址
        // 如果 Schema Registry 啟動了高可用,那麼這兒的配置值可以是多個服務位址,以逗号隔開
        props.put("schema.registry.url", registryHost);
        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

        String key = "Alyssa key";
        Schema schema = new Schema.Parser().parse(new File(schameFilename));
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("name", "Alyssa");
        avroRecord.put("favorite_number", 256);

        // 發送消息
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, avroRecord);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}
           

上面使用到了 KafkaAvroSerializer 序列化消息,接下來看看 KafkaAvroSerializer 的 原理。我們知道 Kafka 的消息由 Key 和 Value 組成,這兩部分的值可以有不同的資料格式。而這些資料格式都會儲存在 Registry 服務端,用戶端需要指定資料格式的名稱(在 Registry 中叫做 subject),才能擷取到。如果我們要擷取目前消息 Key 這部分的資料格式,它對于的 subject 名稱為 -key,如果要擷取 Value 這部分的資料格式,它對應的 subject 名稱為 -value(topic 為該消息所在的 topic 名稱)。

Kafka Schema Registry 還支援修改資料格式,這樣對于同一個 topic ,它的消息有多個版本,前面的消息和最新的消息都可能會完全不一樣,那麼客戶怎麼區分呢。Registry 會為每種資料格式都會配置設定一個 id 号,然後發送的每條消息都會附帶對應的資料格式 id。

KafkaProducer 在第一次序列化的時候,會自動向 Registry 服務端注冊。服務端儲存資料格式後,會傳回一個 id 号。KafkaProducer發送消息的時候,需要附帶這個 id 号。這樣 KafkaConsumer 在讀取消息的時候,通過這個 id 号,就可以從 Registry 服務端 擷取。

Registry 用戶端負責向服務端發送 http 請求,然後會将結果緩存起來,以提高性能。

public class CachedSchemaRegistryClient implements SchemaRegistryClient {
  // Key 為資料格式的名稱, 裡面的 Value 為 Map類型,它對于的 Key 為資料格式,Value 為對應的 id 号
  private final Map<String, Map<Schema, Integer>> schemaCache;
  // Key 為資料格式的名稱,裡面的 Value 為 Map類型,它對于的 Key 為 id 号,Value 為對應的資料格式
  // 這個集合比較特殊,當 Key 為 null 時,表示 id 到 資料格式的緩存
  private final Map<String, Map<Integer, Schema>> idCache;
        
  
  public synchronized int register(String subject, Schema schema, int version, int id)
      throws IOException, RestClientException {
    // 從schemaCache查找緩存,如果不存在則初始化空的哈希表
    final Map<Schema, Integer> schemaIdMap =
        schemaCache.computeIfAbsent(subject, k -> new HashMap<>());

    // 擷取對應的 id 号
    final Integer cachedId = schemaIdMap.get(schema);
    if (cachedId != null) {
      // 檢查 id 号是否有沖突
      if (id >= 0 && id != cachedId) {
        throw new IllegalStateException("Schema already registered with id "
            + cachedId + " instead of input id " + id);
      }
      // 傳回緩存的 id 号
      return cachedId;
    }

    if (schemaIdMap.size() >= identityMapCapacity) {
      throw new IllegalStateException("Too many schema objects created for " + subject + "!");
    }
      
    // 如果緩存沒有,則向服務端發送 http 請求 
    final int retrievedId = id >= 0
                            ? registerAndGetId(subject, schema, version, id)
                            : registerAndGetId(subject, schema);
    // 緩存結果
    schemaIdMap.put(schema, retrievedId);
    idCache.get(null).put(retrievedId, schema);
    return retrievedId;
  }
}
           

Registry 服務端

存儲資料

Registry 服務端将資料格式存儲到 Kafka 中,對應的 topic 名稱為 _schemas。存儲消息的格式如下:

Key 部分,包含資料格式名稱,版本号,由 SchemaRegistryKey 類表示。

Value部分,包含資料格式名稱,版本号, 資料格式 id 号,資料格式的内容,是否被删除, 由 SchemaRegistryValue 類表示。

Registry 服務端在存儲Kafka之前,還會将上述的 Key 和 Value 序列化,目前序列化由兩種方式:

json 序列化,由 ZkStringSerializer 類負責

将 SchemaRegistryKey 或 SchemaRegistryValue 強制轉換為 String 類型儲存起來

處理請求

Registry 服務端主要負責兩種請求,注冊資料格式 schema 請求和 擷取資料格式 schema 請求。

如果 Registry 服務端啟動了高可用,說明有多個服務端在運作。如果注冊 schema 請求發送給了 follower,那麼 follower 會将請求轉發給 leader。至于擷取 schema 請求,follower 和 leader 都能處理,因為 schema 最後都存在了 kafka 中,它們直接從 kafka 裡讀取。

處理注冊 schema 請求

public class KafkaSchemaRegistry implements SchemaRegistry, MasterAwareSchemaRegistry {

  public int registerOrForward(String subject,
                               Schema schema,
                               Map<String, String> headerProperties)
      throws SchemaRegistryException {
    // 檢測這個schema是否之前注冊過
    Schema existingSchema = lookUpSchemaUnderSubject(subject, schema, false);
    if (existingSchema != null) {
      if (schema.getId() != null && schema.getId() >= 0 && !schema.getId().equals(existingSchema.getId())
      ) {
        throw new IdDoesNotMatchException(existingSchema.getId(), schema.getId());
      }
      return existingSchema.getId();
    }

    synchronized (masterLock) {
      if (isMaster()) {
        // 如果是leader,那麼執行register方法,寫schema到kafka
        return register(subject, schema);
      } else {
        // 如果是follower,那麼轉發請求到 leader
        if (masterIdentity != null) {
          return forwardRegisterRequestToMaster(subject, schema, headerProperties);
        } else {
          throw new UnknownMasterException("Register schema request failed since master is "
                                           + "unknown");
        }
      }
    }
  }
}
           

上面調用了 register 方法儲存 schema,同時它也為這個 schema 配置設定了一個 id。這裡簡單說下自增 id 生成器的算法,目前有兩種實作方式。一種是基于記憶體的方式,自己維護一個計數器。另外一種是基于zookeeper的方式,每次從 zookeeper 擷取一個 id 段,然後一個 id,一個 id 的配置設定出去。

處理擷取 schema 請求

public class KafkaSchemaRegistry implements SchemaRegistry, MasterAwareSchemaRegistry {
  // 負責讀取kafka
  final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
  // 緩存
  private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    
  
  public SchemaString get(int id) throws SchemaRegistryException {
    SchemaValue schema = null;
    try {
      // 從緩存中查找,根據 id 擷取消息的key
      SchemaKey subjectVersionKey = lookupCache.schemaKeyById(id);
      if (subjectVersionKey == null) {
        return null;
      }
      // 從kafka中讀取消息的value
      schema = (SchemaValue) kafkaStore.get(subjectVersionKey);
      if (schema == null) {
        return null;
      }
    } catch (StoreException e) {
      throw new SchemaRegistryStoreException(...);
    }
    // 傳回結果
    SchemaString schemaString = new SchemaString();
    schemaString.setSchemaString(schema.getSchema());
    return schemaString;
  }
}
           

高可用

如果要實作高可用,需要運作多個 Registry 服務,這些服務中必須選擇出一個 leader,所有的請求都是最終 由 leader 來負責。當 leader 挂掉之後,就會觸發選舉操作,來選舉出新的 leader。選舉的實作有兩種方式,基于kafka 和 基于 zookeeper。

基于 kafka 的原理是利用消費組,因為消費組的每個成員都需要和 kafka coordinator 服務端保持心跳,如果有成員挂了,那麼就會觸發組的重配置設定操作。重配置設定操作會從存活的成員中,選出 leader 角色。

KafkaGroupMasterElector 啟動了一個心跳線程,定期發送心跳請求。它 實作了監聽器的接口,當出發開始選舉時會調用onRevoked方法,當選舉完之後會調用onAssigned方法。

public class KafkaGroupMasterElector implements MasterElector, SchemaRegistryRebalanceListener {


  public void init() throws SchemaRegistryTimeoutException, SchemaRegistryStoreException {
    // 心跳線程
    executor = Executors.newSingleThreadExecutor();
    executor.submit(new Runnable() {
      
      public void run() {
        try {
          while (!stopped.get()) {
            // 循環調用poll方法,處理心跳
            coordinator.poll(Integer.MAX_VALUE);
          }
        } catch (Throwable t) {
          log.error("Unexpected exception in schema registry group processing thread", t);
        }
      }
    });

  public void onRevoked() {
    log.info("Rebalance started");
    try {
      // 因為要重新選舉,是以将之前的leader清空
      schemaRegistry.setMaster(null);
    } catch (SchemaRegistryException e) {
      // This shouldn't be possible with this implementation. The exceptions from setMaster come
      // from it calling nextRange in this class, but this implementation doesn't require doing
      // any IO, so the errors that can occur in the ZK implementation should not be possible here.
      log.error(
          "Error when updating master, we will not be able to forward requests to the master",
          e
      );
    }
  }
  
  // assignment為選舉結果
  public void onAssigned(SchemaRegistryProtocol.Assignment assignment, int generation) {
    log.info("Finished rebalance with master election result: {}", assignment);
    try {
      switch (assignment.error()) {
        case SchemaRegistryProtocol.Assignment.NO_ERROR:
          if (assignment.masterIdentity() == null) {
            log.error(...);
          }
          // 記錄配置設定結果
          schemaRegistry.setMaster(assignment.masterIdentity());
          joinedLatch.countDown();
          break;
        case SchemaRegistryProtocol.Assignment.DUPLICATE_URLS:
          throw new IllegalStateException(...);
        default:
          throw new IllegalStateException(...);
      }
    } catch (SchemaRegistryException e) {
      ......
    }
  }
}
           

基于 zookeeper 的方式,會更加簡單,效率也更高。因為隻有 leader 挂掉,zookeeper 才會觸發重新選舉。而基于 kafka 的方式,隻要是有一個成員挂掉,不管它是不是 leader,都會觸發重新選舉。如果這個成員不是 leader,則會造成不必要的選舉。

使用zookeeper方式的原理是,所有 Registry 服務都會監聽一個臨時節點,而隻有 leader 才會占有這個節點。當 leader 挂掉之後,臨時節點會消失。其餘的服務發現臨時節點不存在,就會立即嘗試重新建立,而隻有一個服務能夠建立成功,成為 leader。

public class ZookeeperMasterElector implements MasterElector {
  // 選舉使用的臨時節點    
  private static final String MASTER_PATH = "/schema_registry_master";
    
  public void electMaster() throws
      SchemaRegistryStoreException, SchemaRegistryTimeoutException,
      SchemaRegistryInitializationException, IdGenerationException {
    SchemaRegistryIdentity masterIdentity = null;
    try {
      // 嘗試在zookeeper中,建立臨時節點
      zkUtils.createEphemeralPathExpectConflict(MASTER_PATH, myIdentityString,
                                                zkUtils.defaultAcls(MASTER_PATH));
      log.info("Successfully elected the new master: " + myIdentityString);
      masterIdentity = myIdentity;
      schemaRegistry.setMaster(masterIdentity);
    } catch (ZkNodeExistsException znee) {
      // 建立失敗
      readCurrentMaster();
    }
  }    
    
    
  private class MasterChangeListener implements IZkDataListener {

    public MasterChangeListener() {
    }

    // 當資料被更改後,觸發
    
    public void handleDataChange(String dataPath, Object data) {
      
      try {
        if (isEligibleForMasterElection) {
          // 選舉
          electMaster();
        } else {
          // 從節點中讀取leader
          readCurrentMaster();
        }
      } catch (SchemaRegistryException e) {
        log.error("Error while reading the schema registry master", e);
      }
    }

    // leader 挂掉
    
    public void handleDataDeleted(String dataPath) throws Exception {
      if (isEligibleForMasterElection) {
        // 如果允許選舉,那麼立即執行
        electMaster();
      } else {
        // 否則設定之前的leader為空
        schemaRegistry.setMaster(null);
      }
    }
  }

}
           

https://www.dazhuanlan.com/2019/12/10/5deec4add22d4/

參考資料

https://docs.confluent.io/current/schema-registry/index.html

https://github.com/confluentinc/schema-registry

http://avro.apache.org/docs/current/gettingstartedjava.html

繼續閱讀