(Azure Service Bus服務總線的兩大類消息處理方式: 隊列Queue和主題Topic)
問題描述
使用Service Bus作為企業消息代理,當有大量的資料堆積再Queue或Topic中時,如何來優化接收端處理消息的能力呢?
詳細解釋
在接收端(Receover)的代碼中,有兩個屬性與處理消息的能力有關。一是maxConcurrentCalls(最大并發處理數), 二是prefetchCount (預提取消息數)。 在Service Bus的SDK(azure-messaging-servicebus:7.0.0.0)中,他們的描述如下:
maxConcurrentCalls | 接收端所定義的ServiceBusProcessorClient處理的最大并發消息數。 The max concurrent messages that should be processed by the processor. | |
prefetchCount | 接收端要預先提取的消息數 The number of messages to prefetch | |
在初始化ServiceBusProcessorClient對象時,可以設定maxConcurrentCalls和prefetchCount的值。如
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder().processError(errorHandler).maxConcurrentCalls(5).prefetchCount(10).buildProcessorClient();
實驗驗證
在本次的實驗中,如何來驗證maxConcurrentCalls值啟作用了呢?如何判斷prefetchCount是否擷取了消息呢?
- 針對maxConcurrentCalls,可以在處理消息的代碼中[processMessage(messageProcessor)]列印出目前線程的ID[Thread.currentThread().getId()]。
- 針對prefetchCount,可以從側面來驗證,即擷取message的DeliveryCount來判斷已經預提取了多少次
本次實驗的代碼參考Azure Service Bus的快速入門文檔所編寫,文末包含全部的代碼和POM.XML檔案。
首先在代碼中設定concall和prefetch值。預設情況下為1.本次實驗也從1開始,在設定的10秒鐘之内檢視消費消息的數量。
int concall=1;
int prefetch =1;
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder().connectionString(connectionString)
.processor().topicName(topicName).subscriptionName(subName).processMessage(messageProcessor)
.processError(errorHandler).maxConcurrentCalls(concall).prefetchCount(prefetch).buildProcessorClient();
System.out.println("Starting the processor");
System.out.println("Set Processor: maxConcurrentCalls = "+concall+", prefetchCount = "+prefetch);
processorClient.start();
然後再處理消息的對象中,列印出目前處理消息的次序,消息ID,Delivery次數,處理消息的線程ID。
Consumer<ServiceBusReceivedMessageContext> messageProcessor = context -> {
ServiceBusReceivedMessage message = context.getMessage();
ordernumber++;
System.out.println(ordernumber + " Message ID:" + message.getMessageId() + ",Current Delivery Count:"
+ message.getDeliveryCount() + ",Current Thread ID:" + Thread.currentThread().getId());
};
第一次實驗:處理消息的線程号隻有一個:21, 在10秒時間中處理23條消息
Hello World! SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Starting the processor Set Processor: maxConcurrentCalls = 1, prefetchCount = 1 1 Message ID:7caf842c-b98e-4bb5-88e6-bbacc8c45044,Current Delivery Count:1,Current Thread ID:21 2 Message ID:0589aa12-9787-46dd-ba80-412cb125abee,Current Delivery Count:1,Current Thread ID:21 3 Message ID:86d891cf-f3fc-42d9-88ba-bb90bf410f53,Current Delivery Count:1,Current Thread ID:21 4 Message ID:df22f493-968d-4ab6-a8f8-73758d365079,Current Delivery Count:1,Current Thread ID:21 ... ... 23 Message ID:4422744a-1fb3-4a5c-a0e8-7b598624de56,Current Delivery Count:1,Current Thread ID:21 Total Process Message Count = 23 in 10 seconds. Stopping and closing the processor Done World! |
第二次實驗:處理消息的線程号有5個:21,21,23,24,25, 在10秒時間中處理42條消息
Set Processor: maxConcurrentCalls = 5, prefetchCount = 10 1 Message ID:71333a8b-82a6-48a6-b313-dd5daf155878,Current Delivery Count:0,Current Thread ID:21 2 Message ID:7349bd87-d52e-462e-b549-0069845a89ae,Current Delivery Count:0,Current Thread ID:22 3 Message ID:6b1ae777-b798-42f1-b9c8-85fe09be2f06,Current Delivery Count:0,Current Thread ID:23 4 Message ID:9fb1a641-a9b2-49b6-a352-da8d7ed77894,Current Delivery Count:0,Current Thread ID:24 5 Message ID:7e27a824-577d-4407-8ec2-2813b426ee49,Current Delivery Count:0,Current Thread ID:25 6 Message ID:24fd3b47-1619-4570-9ccb-55731f5c94a3,Current Delivery Count:0,Current Thread ID:21 ... ... 39 Message ID:5b5a6b32-a9aa-493c-ad3d-c88dc8a15ae4,Current Delivery Count:0,Current Thread ID:24 40 Message ID:1510b7fe-744e-4647-a373-4434e1e1b470,Current Delivery Count:0,Current Thread ID:25 41 Message ID:9a64f921-015d-4372-b1e9-3475c4570597,Current Delivery Count:0,Current Thread ID:21 42 Message ID:b744cc37-f3a4-41ed-9582-2bfdf4dc759c,Current Delivery Count:0,Current Thread ID:22 Total Process Message Count = 42 in 10 seconds. |
第三次實驗:處理消息的線程号有10個:21,21 ... 30, 在10秒時間中處理46條消息
Set Processor: maxConcurrentCalls = 10, prefetchCount = 30 1 Message ID:a07fe5a5-047d-4d25-ad1e-c199ef13b249,Current Delivery Count:1,Current Thread ID:21 2 Message ID:d8a45441-d365-4c71-8483-3b1e2714b1bd,Current Delivery Count:1,Current Thread ID:22 3 Message ID:819512bd-6b45-48dd-8ccf-4f81dc45423a,Current Delivery Count:1,Current Thread ID:23 4 Message ID:0390edb1-6f72-41b5-a81a-b1ff08b257b4,Current Delivery Count:1,Current Thread ID:24 5 Message ID:f36cd0ff-84b4-4bd5-938b-83ba94f857f1,Current Delivery Count:1,Current Thread ID:25 6 Message ID:a9155e92-d1a6-4f42-876d-3b18222c9e09,Current Delivery Count:1,Current Thread ID:26 7 Message ID:d0d6d5b8-8ec1-40f2-aee9-c2273ea8dc0a,Current Delivery Count:1,Current Thread ID:27 8 Message ID:c5d9b0c6-bb40-4004-864f-1c5b0f3b66fc,Current Delivery Count:1,Current Thread ID:28 9 Message ID:a0510766-3651-49bb-9b49-fde39ad721dc,Current Delivery Count:1,Current Thread ID:29 10 Message ID:9114cd88-e3ea-4e29-9ba3-45d162d60e14,Current Delivery Count:1,Current Thread ID:30 11 Message ID:d9634704-6808-46b1-959c-fffd77507818,Current Delivery Count:1,Current Thread ID:21 42 Message ID:8519277f-7f37-407d-9736-580b144bec81,Current Delivery Count:1,Current Thread ID:22 43 Message ID:e1b67b72-ec44-4f94-84b2-3ced2fcff598,Current Delivery Count:1,Current Thread ID:23 44 Message ID:d369226c-1ebd-4505-bb85-74d458c54f37,Current Delivery Count:1,Current Thread ID:24 45 Message ID:66a45a5b-22f9-4758-b793-ae92841faedb,Current Delivery Count:1,Current Thread ID:25 46 Message ID:8f027132-6b66-41fe-ad14-d6e7c437fb38,Current Delivery Count:1,Current Thread ID:26 Total Process Message Count = 46 in 10 seconds. |
三次測試的結論
- 在測試中,由于測試的時長隻有10秒,是以無法得出一個合理的maxConcurrentCalls和prefetchCount值。至少maxCouncurrentCalls的值能大幅度提升接收端(Receiver)處理消息的能力。
- 在第三次的的測試中,我們發現Delivery Count的計數變為了1,這是因為在第二次測試中,我們設定的預提取數量為10,每次提取的數量大于了接收端能處理的數量。在10秒鐘的測試中,并沒有完全處理完所有提取出來的消息,以緻于在第三次測試中,這些消息的Delivery次數從0變成了1。
優化建議
預提取可加快消息流程,方法是在應用程式請求消息時及請求消息前,準備好消息用于本地檢索。
- 通過 ReceiveAndDelete 接收模式,預提取緩存區擷取的所有消息在隊列中不再可用,僅保留在記憶體中預提取緩存區,直到應用程式通過 Receive/ReceiveAsync 或 OnMessage/OnMessageAsync API 接收到它們 。 如果在應用程式接收到消息前終止應用程式,這些消息将丢失,且不可恢複。
- 在 PeekLock 接收模式下,提取到預提取緩存區的消息将以鎖定狀态進入緩存區,并且将逾時時鐘用于鎖定計時。 如果預提取緩存區很大,且處理所需時間過長,以緻消息鎖定在駐留于預提取緩存區,甚至應用程式還在處理消息時就到期,可能出現一些令人困惑的事件要應用程式處理。
如果消息處理需要高度的可靠性,且處理需要大量精力和時間,則建議謹慎使用或者絲毫不用預提取功能。
如果需要較高吞吐量且消息處理通常比較便宜,則預提取會産生顯著的吞吐量優勢。
需要均衡對隊列或訂閱配置的最大預提取數和鎖定持續時間,以便鎖定逾時至少超出最大預提取緩存區大小外加一條消息的累積預期消息處理時間。 同時,鎖定逾時不應過長,防止消息在被意外丢棄後超出其最大 TimeToLive,是以需要消息的鎖定在重新傳送消息前到期。
附錄一:使用Service Bus Explorer工具快速生成大量消息
附錄二:測試執行個體pom.xml内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>testgroupid</groupId>
<artifactId>testartifactid</artifactId>
<version>1.0-SNAPSHOT</version>
<name>testartifactid</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.0-beta.7</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
附錄三:App.java代碼
package com.servicebus.test;
import com.azure.messaging.servicebus.*;
import com.azure.messaging.servicebus.models.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.sql.Date;
import java.util.Arrays;
import java.util.List;
/**
* Hello world!
*
*/
public class App {
static String connectionString = "Endpoint=sb://xxxxxxxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
static String topicName = "thisistest";
static String subName = "lubusb1";
static int ordernumber = 0;
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello World!");
// sendMessage();
// sendMessageBatch();
receiveMessages();
System.out.println("Done World!");
}
// handles received messages
static void receiveMessages() throws InterruptedException {
// Consumer that processes a single message received from Service Bus
Consumer<ServiceBusReceivedMessageContext> messageProcessor = context -> {
ServiceBusReceivedMessage message = context.getMessage();
ordernumber++;
System.out.println(ordernumber + " Message ID:" + message.getMessageId() + ",Current Delivery Count:"
+ message.getDeliveryCount() + ",Current Thread ID:" + Thread.currentThread().getId());
};
// Consumer that handles any errors that occur when receiving messages
Consumer<Throwable> errorHandler = throwable -> {
System.out.println("Error when receiving messages: " + throwable.getMessage());
if (throwable instanceof ServiceBusReceiverException) {
ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable;
System.out.println("Error source: " + serviceBusReceiverException.getErrorSource());
}
};
int concall=10;
int prefetch =30;
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder().connectionString(connectionString)
.processor().topicName(topicName).subscriptionName(subName).processMessage(messageProcessor)
.processError(errorHandler).maxConcurrentCalls(concall).prefetchCount(prefetch).buildProcessorClient();
System.out.println("Starting the processor");
System.out.println("Set Processor: maxConcurrentCalls = "+concall+", prefetchCount = "+prefetch);
processorClient.start();
TimeUnit.SECONDS.sleep(10);
System.out.println("Total Process Message Count = "+ordernumber+" in 10 seconds.");
System.out.println("Stopping and closing the processor");
processorClient.close();
}
static void sendMessage() {
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder().connectionString(connectionString).sender()
.topicName(topicName).buildClient();
// send one message to the topic
senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
System.out.println("Sent a single message to the topic: " + topicName);
}
static List<ServiceBusMessage> createMessages() {
// create a list of messages and return it to the caller
ServiceBusMessage[] messages = { new ServiceBusMessage("First message"),
new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") };
return Arrays.asList(messages);
}
static void sendMessageBatch() {
// create a Service Bus Sender client for the topic
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder().connectionString(connectionString).sender()
.topicName(topicName).buildClient();
// Creates an ServiceBusMessageBatch where the ServiceBus.
ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
// create a list of messages
List<ServiceBusMessage> listOfMessages = createMessages();
// We try to add as many messages as a batch can fit based on the maximum size
// and send to Service Bus when
// the batch can hold no more messages. Create a new batch for next set of
// messages and repeat until all
// messages are sent.
for (ServiceBusMessage message : listOfMessages) {
if (messageBatch.tryAddMessage(message)) {
continue;
}
// The batch is full, so we create a new batch and send the batch.
senderClient.sendMessages(messageBatch);
System.out.println("Sent a batch of messages to the topic: " + topicName);
// create a new batch
messageBatch = senderClient.createMessageBatch();
// Add that message that we couldn't before.
if (!messageBatch.tryAddMessage(message)) {
System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.",
messageBatch.getMaxSizeInBytes());
}
}
if (messageBatch.getCount() > 0) {
senderClient.sendMessages(messageBatch);
System.out.println("Sent a batch of messages to the topic: " + topicName);
}
// close the client
senderClient.close();
}
}
參考資料
Service Bus Explorer:https://github.com/paolosalvatori/ServiceBusExplorer
預提取 Azure 服務總線消息:https://docs.azure.cn/zh-cn/service-bus-messaging/service-bus-prefetch#if-it-is-faster-why-is-prefetch-not-the-default-option
預提取:https://docs.azure.cn/zh-cn/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2#prefetching
向 Azure 服務總線隊列發送消息并從中接收消息 (Java):https://docs.azure.cn/zh-cn/service-bus-messaging/service-bus-java-how-to-use-queues
當在複雜的環境中面臨問題,格物之道需:濁而靜之徐清,安以動之徐生。 雲中,恰是如此!