天天看點

【Azure 事件中心】從Azure Event Hub中消費資料,如何檢視目前消費用戶端消費資料的Offset和SequenceNumber呢(消息偏移量和序列号)?

問題描述

當通過Azure Event Hub SDK消費Event Hub中的消息時,必須指定一個Storage Account(存儲賬号)用于儲存 Checkpoint (檢查點)。

【Azure 事件中心】從Azure Event Hub中消費資料,如何檢視目前消費用戶端消費資料的Offset和SequenceNumber呢(消息偏移量和序列号)?

比如在C#代碼中,需要指定Storage Account Connection String 和一個Blob Container的名稱。其他語言代碼也是一樣,都需要指定Storage Account。

private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
    private const string blobContainerName = "<BLOB CONTAINER NAME>";

...... 

  static async Task Main()
    {
        // Read from the default consumer group: $Default
        string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

        // Create a blob container client that the event processor will use 
        storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);

        // Create an event processor client to process events in the event hub
        processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

        // Register handlers for processing events and handling errors
        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        // Start the processing
        await processor.StartProcessingAsync();

        // Wait for 30 seconds for the events to be processed
        await Task.Delay(TimeSpan.FromSeconds(30));

        // Stop the processing
        await processor.StopProcessingAsync();
    }      

那麼,在Storage Account中如何檢視Offest 和 Sequence Number的值呢?

解答如下

【Azure 事件中心】從Azure Event Hub中消費資料,如何檢視目前消費用戶端消費資料的Offset和SequenceNumber呢(消息偏移量和序列号)?

 如上圖,在Azure門戶頁面中,進入目前使用的Storage Account頁面:

  1. 在指定的Container中,SDK會自動建立一個以Event Hub Namespace主機域名為名的Folder
  2. 然後是Event Hub -> 消費組(預設為 $default) --> Checkpint --> 分區号(從0開始,1,2 ...) 
  3. 在以分區号為名稱的Blob的中繼資料(Metadata)記錄了sequencenumber 和 offset的值。

PS: 當删除或修改這個值後,如果重新開機Event Hub的消費端,這會導緻資料從新的OFFSET開始擷取。

參考資料

Checkpoint: https://docs.azure.cn/zh-cn/event-hubs/event-processor-balance-partition-load#checkpointing

向 Azure 事件中心發送事件及從 Azure 事件中心接收事件 - .NET (Azure.Messaging.EventHubs): https://docs.azure.cn/zh-cn/event-hubs/event-hubs-dotnet-standard-getstarted-send#receive-events

當在複雜的環境中面臨問題,格物之道需:濁而靜之徐清,安以動之徐生。 雲中,恰是如此!

繼續閱讀