問題描述
當使用SDK連接配接到Azure Event Hub時,最正常的方式為使用連接配接字元串。這種做法參考官網文檔就可成功完成代碼:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
隻是,如果使用Azure AD認證方式進行通路,代碼需要如何修改呢? 如何來使用AAD的 TokenCredential呢?
分析問題
在使用Connection String的時候,EventProcessorClientBuilder使用connectionString方法配置連接配接字元串。
如果使用Azure AD認證,則需要先根據AAD中注冊應用擷取到Client ID, Tenant ID, Client Secret,然後把這些内容設定為系統環境變量
- AZURE_TENANT_ID :對應AAD 注冊應用頁面的 Tenant ID
- AZURE_CLIENT_ID :對應AAD 注冊應用頁面的 Application (Client) ID
- AZURE_CLIENT_SECRET :對應AAD 注冊應用的 Certificates & secrets 中建立的Client Secrets
然後使用 credential 初始化 EventProcessorClientBuilder 對象
注意點:
1) DefaultAzureCredentialBuilder 需要指定 Authority Host為 Azure China
2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名
操作實作
第一步:為AAD注冊應用賦予操作Event Hub Data的權限
Azure 提供了以下 Azure 内置角色,用于通過 Azure AD 和 OAuth 授予對事件中心資料的通路權限:
- Azure 事件中心資料所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予對事件中心資源的完全通路權限。
- Azure 事件中心資料發送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予對事件中心資源的發送通路權限。
- Azure 事件中心資料接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予對事件中心資源的使用/接收通路權限。
本執行個體中,隻需要接收資料,是以隻賦予了 Azure Event Hubs Data Receiver權限。
第二步:在Java 項目中添加SDK依賴
添加在pom.xml檔案中 dependencies 部分的内容為:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出現運作時出現類型沖突或找不到
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>spdemo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.12.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
第三步:添加完整代碼
package com.example;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.AzureAuthorityHosts;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.*;
import com.azure.storage.blob.*;
import java.io.IOException;
import java.sql.Date;
import java.time.Instant;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
* Hello world!
*
*/
public class App {
// private static final String connectionString ="<connectionString>";
// private static final String eventHubName = "<eventHubName>";
// private static final String storageConnectionString = "<storageConnectionString>";
// private static final String storageContainerName = "<storageContainerName>";
public static void main(String[] args) throws IOException {
System.out.println("Hello World!");
// String connectionString ="<connectionString>";
// The fully qualified namespace for the Event Hubs instance. This is likely to
// be similar to:
// {your-namespace}.servicebus.windows.net
// String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn";
// String eventHubName = "<eventHubName>";
String storageConnectionString = System.getenv("storageConnectionString");
String storageContainerName = System.getenv("storageContainerName");
String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace");
String eventHubName = System.getenv("eventHubName");
TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA)
.build();
// Create a blob container client that you use later to build an event processor
// client to receive and process events
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(storageContainerName)
.buildAsyncClient();
// EventHubProducerClient
// EventHubProducerClient client = new EventHubClientBuilder()
// .credential(fullyQualifiedNamespace, eventHubName, credential)
// .buildProducerClient();
Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
// EventProcessorClientBuilder
// Create a builder object that you will use later to build an event processor
// client to receive and process events and errors.
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
// .connectionString(connectionString, eventHubName)
.credential(fullyQualifiedNamespace, eventHubName, credential)
.initialPartitionEventPosition(initialPartitionEventPosition)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
// EventPosition.f
// Use the builder object to create an event processor client
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
System.out.println("Starting event processor");
eventProcessorClient.start();
System.out.println("Press enter to stop.");
System.in.read();
System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");
System.out.println("Exiting process");
}
public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
PartitionContext partitionContext = eventContext.getPartitionContext();
EventData eventData = eventContext.getEventData();
System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
// Every 10 events received, it will update the checkpoint stored in Azure Blob
// Storage.
if (eventData.getSequenceNumber() % 10 == 0) {
eventContext.updateCheckpoint();
}
};
public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
};
}
附錄:自定義設定 Event Position,當程式運作時,指定從Event Hub中擷取消息的 Sequence Number
使用EventPosition對象中的fromSequenceNumber方法,可以指定一個序列号,Consume端會根據這個号碼擷取之後的消息。其他的方法還有 fromOffset(指定遊标) / fromEnqueuedTime(指定一個時間點,擷取之後的消息) / earliest(從最早開始) / latest(從最後開始擷取新的資料,舊資料不擷取)
Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
注意:
Map<String, EventPosition> 中的String 對象為Event Hub的分區ID,如果Event Hub有2個分區,則它的值分别時0,1.
EventPosition 設定的值,隻在Storage Account所儲存在CheckPoint Store中的值沒有,或者小于此處設定的值時,才會起效果。否則,Consume 會根據從Checkpoint中擷取的SequenceNumber為準。
參考資料
授予對 Azure 事件中心的通路權限 :https://docs.azure.cn/zh-cn/event-hubs/authorize-access-event-hubs
對使用 Azure Active Directory 通路事件中心資源的應用程式進行身份驗證 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application
使用 Java 向/從 Azure 事件中心 (azure-messaging-eventhubs) 發送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
[END]
當在複雜的環境中面臨問題,格物之道需:濁而靜之徐清,安以動之徐生。 雲中,恰是如此!