問題描述
在使用Azure Media Service的官網示例 (media-services-v3-java -->AudioAnalytics -->AudioAnalyzer )代碼的過程中,根據配置添加了 Event Hub 和Storage Account,使用 Event Grid 來擷取擷取Job的運作狀态。
Analyze a media file with a audio analyzer preset
Optional, do the following steps if you want to use Event Grid for job monitoring
Please note, there are costs for using Event Hub. For more details, refer Event Hubs pricing and FAQ
Enable Event Grid resource provider
az provider register --namespace Microsoft.EventGrid
To check if registered, run the next command. You should see "Registered"
az provider show --namespace Microsoft.EventGrid --query "registrationState"
Create an Event Hub
namespace=<unique-namespace-name>
hubname=<event-hub-name>
az eventhubs namespace create --name $namespace --resource-group <resource-group>
az eventhubs eventhub create --name $hubname --namespace-name $namespace --resource-group <resource-group>
Subscribe to Media Services events
hubid=$(az eventhubs eventhub show --name $hubname --namespace-name $namespace --resource-group <resource-group> --query id --output tsv)
amsResourceId=$(az ams account show --name <ams-account> --resource-group <resource-group> --query id --output tsv)
az eventgrid event-subscription create --resource-id $amsResourceId --name <event-subscription-name> --endpoint-type eventhub --endpoint $hubid
- Create a storage account and container for Event Processor Host if you don't have oneCreate a storage account for Event Processor Host
Update appsettings.json with your Event Hub and Storage information StorageAccountName: The name of your storage account.
StorageAccountKey: The access key for your storage account. Navigate to Azure portal, "All resources", search your storage account, then "Access keys", copy key1.
StorageContainerName: The name of your container. Click Blobs in your storage account, find you container and copy the name.
EventHubConnectionString: The Event Hub connection string. search your namespace you just created. <your namespace> -> Shared access policies -> RootManageSharedAccessKey -> Connection string-primary key.
EventHubName: The Event Hub name. <your namespace> -> Event Hubs.
但根據文檔配置完成後,運作代碼,出現長時間卡頓。根據日志輸出,卡頓在 “Creating an event processor host to process events from Event Hub...:” 直到Timeout為止。
Creating a transform...
Transform created
Creating an input asset...
Uploading a media file to the asset...
Creating a job...
Creating an event processor host to process events from:2022-10-02T12:09:05.694
Timeout happened.
Job final state received, unregistering event processor...
Job elapsed time: 1800 second(s).
Job finished.
這是為什麼呢?
怎麼解決卡頓問題呢?
問題解決
因為上面的代碼使用了Azure Event Hub Hub,是以需要了解用戶端是如何從 Event Hub中擷取到資料。
簡單來講,Event Hub作為一個中轉的消息中心,需要使用者自動的發送,接收消息。
本例中,通過Event Grid訂閱了Media Service Job的輸出内容并通過服務自動發送到Event Hub中。是以在 AudioAnalyzer 代碼中,我們隻處理了接收消息。
AudioAnalyzer.java 中聲明了封裝好的 MediaServicesEventProcessor對象。
// Create a event processor host to process events from Event Hub.
Object monitor = new Object();
eventProcessorHost = new MediaServicesEventProcessor(jobName, monitor, null,
config.getEventHubConnectionString(), config.getEventHubName(),
container);
// Define a task to wait for the job to finish.
Callable<String> jobTask = () -> {
synchronized (monitor) {
monitor.wait();
}
return "Job";
};
MediaServicesEventProcessor.java 中初始化 Event process Host對象。使用的Azure官方 com.azure.messaging.eventhubs.EventProcessorClient 包
public MediaServicesEventProcessor(String jobName, Object monitor, String liveEventName,
String eventHubConnectionString, String eventHubName,
BlobContainerAsyncClient container) {
this.eventHubConnectionString = eventHubConnectionString;
this.eventHubName = eventHubName;
this.blobContainer = container;
if (jobName != null) {
this.jobName = jobName.replaceAll("-", "");
} else {
this.jobName = null;
}
this.monitor = buildEventProcessClient();
monitor = this.monitor;
if (liveEventName != null) {
this.liveEventName = liveEventName.replaceAll("-", "");
} else {
this.liveEventName = null;
}
}
...
private EventProcessorClient buildEventProcessClient() {
return new EventProcessorClientBuilder()
.connectionString(this.eventHubConnectionString, this.eventHubName)
.checkpointStore(new BlobCheckpointStore(this.blobContainer))
.consumerGroup("$Default")
.processEvent(eventContext -> this.processEvent(eventContext))
.processError(errorContext -> System.out.println("Partition "
+ errorContext.getPartitionContext().getPartitionId()
+ " onError: " + errorContext.getThrowable().toString()))
.processPartitionInitialization(initializationContextConsumer -> System.out.println("Partition "
+ initializationContextConsumer.getPartitionContext().getPartitionId() + " is opening"))
.processPartitionClose(closeContext -> System.out.println("Partition "
+ closeContext.getPartitionContext().getPartitionId()
+ " is closing for reason " + closeContext.getCloseReason().toString()))
.buildEventProcessorClient();
}
但是,對比Event Hub接收消息的示例代碼,卻發現缺少了最關鍵的 start 方法
);
eventProcessorClient.start();
因為Event Processor Client對象并沒有啟動,是以代碼從Event Hub中根本不能接收消息,直到設定的Timeout時間(30分鐘)到了為止。 這就是程式出現長時間卡頓的根源。
解決辦法很簡單,在MediaServicesEventProcessor.java 中添加 start 方法。并在 AudioAnalyzer.java 中調用
1: 在 MediaServicesEventProcessor.java 中添加 start
2: 在 AudioAnalyzer.java 中調用 start

修改完成後,重新啟動程式,即可從Event Hub中擷取到目前Job的狀态
全部示例代碼參考: https://github.com/LuBu0505/media-services-v3-java/tree/main/AudioAnalytics/AudioAnalyzer
參考資料
使用 Java 向/從 Azure 事件中心 (azure-messaging-eventhubs) 發送/接收事件: https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send#receive-events