前言
對于pulsar的特性以及優異,這裡不多講解,直接上幹貨,主要講一下Pulsar的docker部署,生産者/消費者幾種
不同模式,以及Topic的使用規則
Docker部署pulsar
docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 -d apachepulsar/pulsar-standalone
部署問題
因為我用的是騰訊雲最基礎的伺服器,在執行docker指令後,發現Pulsar會啟動失敗或啟動不久便停止,檢視日志發現是記憶體頂不住
檢視官網Pulsar預設啟動是2g,是以把啟動配置修改成機器支援的即可;
docker exec -it pulsar-test sh
cd /pulsar/conf/
vim conf/pulsar_env.sh; 之後重新開機pulsar即可
連接配接Pulsar
/**
* pulsar 連接配接bean
*/
@Bean
public PulsarClient getPulsarClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl("pulsar://Ip位址:6650")
.build();
}
基礎概念了解
Produce 消息的源頭,也是消息的釋出者,負責将消息發送到 topic。
Consumer 消息的消費者,負責從 topic 訂閱并消費消息。
Topic 消息資料的載體,在 Pulsar 中 Topic 可以指定分為多個 partition,如果不設定預設隻有一個 partition
(這個指定多個partition,我會在文中後面示例示範,可以留意下)
Brkber 一個無狀态元件,主要負責接收 Producer 發送過來的消息,并傳遞給 Consumer,可以了解成送快遞的小哥
Produce詳解
建立方式
簡單方法建立
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message")
loadConf自定義配置建立
config裡面可以填一些自定義配置,如sendTimeoutMs 消息發送逾時(毫秒)。如果在sendTimeout過期之前伺服器未确認消息,則會發生錯誤,其他有趣的可以看下官網
Pulsar官網
/**
* 使用loadConf建立Produce
*/
@Test
public void testProducer() throws Exception {
Map<String, Object> config1 = new HashMap<>();
config1.put("producerName", "produce-demo1");
config1.put("topicName", "topic1");
Producer producer1 = client
.newProducer()
.loadConf(config1)
.create();
producer1.send(("test1 --- " + new Date()).getBytes());
}
發送模式
同步發送
同步發送消息是Producer發送消息以後要等到broker的确認以後才會認為消息發送成功,如果沒有收到确認就認為消息發送失敗
/**
* 測試同步發送
*/
@Test
public void testProducer22() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("produce-demo1")
.create();
MessageId messageId = stringProducer.send("My message" + "發送消息時間" + new Date());
System.out.println("消息同步發送---");
System.in.read();
}
複制代碼
異步發送
異步發送消息是 Producer 發送消息,将消息放到阻塞隊列中并立即傳回。不需要等待 broker 的确認
/**
* 測試異步發送
*/
@Test
public void testProducer222() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("produce-demo1")
.create();
CompletableFuture<MessageId> messageIdCompletableFuture = stringProducer.sendAsync(
"異步發送的消息");
System.in.read();
}
通路方式/發送方式
Share模式(預設情況)
預設情況下多個生産者可以釋出消息到同一個Topic,指定發送模式.accessMode(ProducerAccessMode.Shared)方法
/**
* shard模式 預設情況下多個生産者可以釋出消息到同一個 Topic
*/
@Test
public void testProducer222() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.accessMode(ProducerAccessMode.Shared)
.topic("通路模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "發送消息時間" + new Date());
Producer<String> stringProducer2 = client
.newProducer(Schema.STRING)
.accessMode(ProducerAccessMode.Shared)
.topic("通路模式-shared")
// Producer with name 'produce-demo1' is already connected to topic
//注意生産者名稱不能重複
.producerName("produce-demo2")
.create();
stringProducer2.send("My message 2 " + "發送消息時間" + new Date());
System.in.read();
}
請注意:
這裡我特意标注了生産者名稱不能重複,否則對于Pulsar來說,發送消息會報錯,如下圖,已經有一個produce-
demo1的生産者了,再來一個就會報錯Producer with name 'produce-demo1' is already connected to topic
是以如果我們是叢集部署的話,尤其注意每一個節點生産者的命名
當然對于消費者也是同樣的規則,不允許名稱重複(在下文我也會示範到)
/**
* 示範生産者名稱重複,發送報錯
*/
@Test
public void testProducer1() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("通路模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "發送消息時間" + new Date());
System.in.read();
}
/**
* 示範生産者名稱重複,發送報錯
*/
@Test
public void testProducer11() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("通路模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "發送消息時間" + new Date());
System.in.read();
}
Exclusive
要求生産者以獨占模式通路 Topic,在此模式下如果 Topic已經有了生産者,那麼其他生産者在連接配接就會失敗報錯。
/**
* Exclusive 要求生産者以獨占模式通路 Topic,在此模式下 如果 Topic 已經有了生産者,那麼其他生産者在連接配接就會失敗報錯。
* <p>
* "Topic has an existing exclusive producer: standalone-0-12
*/
@Test
public void testProducer6() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("通路模式-Exclusive")
//設定通路模式 預設shared
.accessMode(ProducerAccessMode.Exclusive)
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "發送消息時間" + new Date());
Producer<String> stringProducer2 = client
.newProducer(Schema.STRING)
.topic("通路模式-Exclusive")
//設定通路模式 預設shared
.accessMode(ProducerAccessMode.Exclusive)
// Producer with name 'produce-demo1' is already connected to topic
//注意生産者名稱不能重複
.producerName("produce-demo2")
.create();
stringProducer2.send("My message 2 " + "發送消息時間" + new Date());
System.in.read();
}
WaitForExclusive
如果主題已經連接配接了生産者,則将目前生産者挂起,直到生産者獲得了Exclusive 通路權限。
該怎麼來了解這句話,打個不恰當比喻,類似于Java中的獨占鎖Sycronized一樣,你沒有擷取到鎖,沒有擷取到權限,就不能發消息,
對比Exclusive報錯來說,WaitForExclusive是不會報錯的,隻會是挂起,
來看下面的demo感受下
1 我們先開啟一個線程A向 通路模式-WaitForExclusive topic發送一條消息,My message 1 ***
/**
* WaitForExclusive
* <p>
* 如果主題已經連接配接了生産者,則将目前生産者挂起,直到生産者獲得了 Exclusive 通路權限。
* <p>
* 也就是存在相同的生産者,不會報錯,當然也不會發送消息, 擷取到獨占後,會将未擷取到獨占時的消息進行發送!!!
*/
@Test
public void testProducer2() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("通路模式-WaitForExclusive")
//設定通路模式 預設shared
.accessMode(ProducerAccessMode.WaitForExclusive)
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "發送消息時間" + new Date());
System.in.read();
}
2 然後再開啟另一個線程B向 通路模式-WaitForExclusive topic發送10條消息,My message 2 ***
/**
* WaitForExclusive
*/
@Test
public void testProducer22() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("通路模式-WaitForExclusive")
//設定通路模式 預設shared
.accessMode(ProducerAccessMode.WaitForExclusive)
.producerName("produce-demo1")
.create();
//假設有10條消息在未擷取 獨占前,均未被發送,模拟來看一下,擷取獨占後, 這10條消息會進行發送嗎 ? 會
for (int i = 0; i < 10; i++) {
stringProducer.send("My message 2 " + "發送消息時間" + new Date());
}
System.in.read();
}
3 然後寫個簡單的消費者看一下消費情況
@Test
public void testConsumer2() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("通路模式-WaitForExclusive")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe()
System.in.read();
}
4 會看到消費者隻消費到了 線程A發送的消息,線程B的消息未被消費,因為此時topic的獨占權還線上程池A上
5 手動殺死線程A,然後看消費者情況,會看到開始消費出My message 2 *** 也就是線程B的消息,
因為此時線程A被殺死,線程B得到了獨占權,線程B将消息發送出去
Consumer詳解
建立方式
簡單方法建立
可以看到寫了一個while true去擷取消息,對于線城是阻塞不友好的,是以我一般用第二種,監聽器方法
/**
* 建立消費者
*/
@Test
public void testConsumer22() throws Exception{
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.println("Message received: " + new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
}
監聽器方法建立
/**
* 接收消息:異步 不阻塞主線程
*/
@Test
public void testConsumer2() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe();
System.out.println("監聽器方式,不阻塞線程");
System.in.read();
}
loadConf自定義配置建立
更多自定義的配置可以看下官網檔案
/**
* loadConf建立消費者
*/
@Test
public void testConsumer222() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Map<String, Object> config1 = new HashMap<>();
config1.put("subscriptionName", "consumer-demo1");
config1.put("topicNames", Arrays.asList(new String[]{"my-topic"}));
Consumer consumer = client
.newConsumer()
.loadConf(config1)
.messageListener(myMessageListener)
.subscribe();
System.out.println("loadConf方式");
System.in.read();
}
多主題訂閱
多主題訂閱主要是指一個消費者,可以訂閱多個topic,這裡我隻示範其中兩個
傳入List數組的多主題訂閱
/**
* Multi-topic subscriptions
* 多主題訂閱
* 多topic 訂閱list設定的topic1 topic2
*/
@Test
public void testConsumer3() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
ConsumerBuilder consumerBuilder = client.newConsumer()
.subscriptionName("consumer-3");
List<String> topics = Arrays.asList(
"topic1",
"topic2"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
正規表達式多主題訂閱
簡單點就是正規表達式比對,根據業務需要自行設定表達式,這裡不多示範
/**
* Multi-topic subscriptions
* 多主題訂閱
* 正規表達式,訂閱所有以1結束的topic
*
*/
@Test
public void testConsumer222() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
ConsumerBuilder consumerBuilder = client.newConsumer()
.subscriptionName("consumer-1");
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*1");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
消費模式
Exclusive(預設)
這裡需要注意的是同一topic主題上隻能有一個具有相同訂閱名稱的使用者 預設,也就是說 如果後端是叢集部署的話,請注意預設情況下subscriptionName的命名情況,否則會報錯
/**
* Exclusive 模式 也是預設的
* 同一主題上隻能有一個具有相同訂閱名稱的使用者 預設
* 否則會啟動報錯
*/
@Test
public void testConsumerExclusive() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription2")
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(myMessageListener)
.subscribe();
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription2")
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
Failover
這個主要是失敗轉移,對比Exclusive模式,同一主題上可以有具有相同訂閱名稱的使用者,也就是subscriptionName可以重複,一個節點挂掉了 剩餘消息轉移到另一個節點繼續消費;
這塊的業務場景挺不錯的,假設我們背景有兩台叢集部署機器A,B,并且subscriptionName相同,
正常情況下,其他子產品往隊列仍了一條消息,但是隻希望被其中一台機器消費, 一條消息被消費一次,而不是A,B兩機器都消費對吧,正常的幂等性操作
現在開始模拟,假設其他子產品發送了10條消息,然後隻被其中一台消費
@Test
public void testProduce2() throws PulsarClientException {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
// 這裡的key可以類似于 投遞到 不同broker的一個辨別
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
}
@Test
public void testConsumerFailover() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
// a++;
// if (a > 4) {
// System.out.println("模拟節點1故障");
//關閉節點1
// consumer.close();
// throw new RuntimeException("模拟某時刻節點1故障,轉移至節點2消費");
// }
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
再來看看失敗轉移,假設其中一台機器當機,然後我希望剩下機器B,繼續消費未消費完的消息,可以看到一台機器模拟當機後,另一台機器繼續消費,也就是失敗轉移
/**
* Failover故障轉移 .subscriptionName("my-subscription") 可重複
* 一個節點挂掉了 剩餘消息轉移到另一個節點繼續消費
* 注意這些消費模式 都是和subscriptionName("my-subscription") 訂閱者名稱相關
*/
int a = 0;
@Test
public void testConsumerFailover() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
a++;
if (a > 4) {
System.out.println("模拟節點1故障");
//關閉節點1
consumer.close();
throw new RuntimeException("模拟某時刻節點1故障,轉移至節點2消費");
}
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
//consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
Shared
多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證
/**
* Shared模式
* 多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證。
* 也就是消費者 1 消費者2 總共消費10條
* 注意都是從 .subscriptionName("my-subscription") 視角
*/
@Test
public void testShared() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
Key_Shared模式
這個簡單來了解,發送消息的時候,給這批消息指定一個key,那麼消息被消費的時候,相同key的這批消息,隻能被同一個節點消費
如下示例我發送消息時,指定下key,然後寫消費者看下消費情況,會看到key相同的消息被同一節點消費
@Test
public void testProduce2() throws PulsarClientException {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
}
/**
* Key_Shared模式
* 多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證。
* 也就是消費者 1 消費者2 總共消費10條
* 注意都是從 .subscriptionName("my-subscription") 視角
* <p>
* 具有相同密鑰的消息僅按順序傳遞給一個消費者。消息在不同消費者之間的可能分布(預設情況下,我們事先不知道哪些密鑰将被配置設定給消費者,但一個密鑰隻會同時被配置設定給消費者
* ("key-1", "message-1-1")
* ("key-1", "message-1-2")
* ("key-1", "message-1-3")
* ("key-3", "message-3-1")
* ("key-3", "message-3-2")
* <p>
* <p>
* ("key-2", "message-2-1")
* ("key-2", "message-2-2")
* ("key-2", "message-2-3")
* ("key-4", "message-4-1")
* ("key-4", "message-4-2")
*/
@Test
public void testKeyShared() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
模式對比
Exclusive隻支援同一topic隻能有一個同名訂閱者,對于目前大多叢集架構,需要每個節點命名subscriptionName不同操作下,
叢集中的每個節點都能收到topic消息,對于特殊場景如 前端websocket連接配接背景叢集這類場景,還是蠻實用
Failover:可以保證在叢集中消息隻被消費一次,幂等性嘛簡單點,正常情況下隻被其中一台機器消費,也就是固定一台機器,這種就很紗布了
Shared: 可以保證在叢集中消息隻被消費一次,也是保證了幂等性,而且消息被叢集平均消費了,壓力down down
Key_Shared 我再想想
Topic
Pulsar對topic的命名有如下規則,
{persistent|non-persistent}://tenant/namespace/topic
- persistent / non-persistent 表示主題的類型,主題分為持久化和非持久化主題,預設是持久化的類型。持久化的主題會将消息儲存到磁盤上,而非持久化的主題就不會将消息儲存到磁盤。
- tenant Pulsar 中主題的租戶,租戶對于 Pulsar中的多租戶至關重要,并且分布在叢集中。
- namespace 将相關聯的 Topic 作為一個組來管理,是管理 Topic 的基本單元。每個租戶可以有一個或多個命名空間。
在上面的示例,我們都沒有去關注persistent,tenant,namespace的玩法,因為你不去特殊設定的話,pulsar都有預設的
我們可以嘗試往persistent://sample/namespace_test4/topic-haha1直接發一條消息,你會發現發送報錯Policies not found for sample/namespace_test4 namespace
/**
* 報錯
* 向租戶sample 命名空間 namespace_test4 topic topic-haha1 發送消息
* 注意namespace需手動先建立好,否則會報錯 olicies not found for sample/namespace_test4 namespace
*/
@Test
public void testProduce322() throws Exception {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://sample/namespace_test4/topic-haha1")
.enableBatching(false)
.create();
producer.send("向租戶sample 命名空間 namespace_test2 topic topic-haha1 發送消息");
System.in.read();
}
這裡則表示我們需要先建立namespace之後,搞好對應的namespace tenant這些之後才行
那麼如何動态去建立namespace,管理tenanat,以及包括我們剛才搞了那麼多的生産者消費者測試出來,
能不能有一個UI界面讓我一目了然,一手掌握Pulsar呢?
這裡我即将介紹Pulsar的一款UI工具Pulsar admin
敬請期待,持續更新
連結:https://juejin.cn/post/7220715429244469308