天天看點

消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

作者:Java小熊

前言

對于pulsar的特性以及優異,這裡不多講解,直接上幹貨,主要講一下Pulsar的docker部署,生産者/消費者幾種 
不同模式,以及Topic的使用規則           

Docker部署pulsar

docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 -d apachepulsar/pulsar-standalone           

部署問題

因為我用的是騰訊雲最基礎的伺服器,在執行docker指令後,發現Pulsar會啟動失敗或啟動不久便停止,檢視日志發現是記憶體頂不住           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範
檢視官網Pulsar預設啟動是2g,是以把啟動配置修改成機器支援的即可;
docker exec -it pulsar-test sh
cd /pulsar/conf/
vim conf/pulsar_env.sh;       之後重新開機pulsar即可           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

連接配接Pulsar

/**
 * pulsar 連接配接bean
 */
@Bean
public PulsarClient getPulsarClient() throws PulsarClientException {
    return PulsarClient.builder()
            .serviceUrl("pulsar://Ip位址:6650")
            .build();
}           

基礎概念了解

Produce 消息的源頭,也是消息的釋出者,負責将消息發送到 topic。

Consumer 消息的消費者,負責從 topic 訂閱并消費消息。

Topic 消息資料的載體,在 Pulsar 中 Topic 可以指定分為多個 partition,如果不設定預設隻有一個 partition
(這個指定多個partition,我會在文中後面示例示範,可以留意下)

Brkber 一個無狀态元件,主要負責接收 Producer 發送過來的消息,并傳遞給 Consumer,可以了解成送快遞的小哥           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Produce詳解

建立方式

簡單方法建立

Producer<String> stringProducer = client.newProducer(Schema.STRING)
        .topic("my-topic")
        .create();
    stringProducer.send("My message")           

loadConf自定義配置建立

config裡面可以填一些自定義配置,如sendTimeoutMs 消息發送逾時(毫秒)。如果在sendTimeout過期之前伺服器未确認消息,則會發生錯誤,其他有趣的可以看下官網           

Pulsar官網

/**
* 使用loadConf建立Produce
*/
@Test
public void testProducer() throws Exception {
   Map<String, Object> config1 = new HashMap<>();
   config1.put("producerName", "produce-demo1");
   config1.put("topicName", "topic1");
   Producer producer1 = client
           .newProducer()
           .loadConf(config1)
           .create();
   producer1.send(("test1 --- " + new Date()).getBytes());
}           

發送模式

同步發送

同步發送消息是Producer發送消息以後要等到broker的确認以後才會認為消息發送成功,如果沒有收到确認就認為消息發送失敗            
/**
 * 測試同步發送
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    MessageId messageId = stringProducer.send("My message" + "發送消息時間" + new Date());
    System.out.println("消息同步發送---");
    System.in.read();
}
複制代碼           

異步發送

異步發送消息是 Producer 發送消息,将消息放到阻塞隊列中并立即傳回。不需要等待 broker 的确認           
/**
 * 測試異步發送
 */
@Test
public void testProducer222() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    CompletableFuture<MessageId> messageIdCompletableFuture = stringProducer.sendAsync(
            "異步發送的消息");
    System.in.read();
}           

通路方式/發送方式

Share模式(預設情況)

預設情況下多個生産者可以釋出消息到同一個Topic,指定發送模式.accessMode(ProducerAccessMode.Shared)方法           
/**
 * shard模式 預設情況下多個生産者可以釋出消息到同一個 Topic
 */
@Test
public void testProducer222() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("通路模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "發送消息時間" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("通路模式-shared")
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生産者名稱不能重複
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "發送消息時間" + new Date());

    System.in.read();
}           

請注意:

這裡我特意标注了生産者名稱不能重複,否則對于Pulsar來說,發送消息會報錯,如下圖,已經有一個produce- 
  demo1的生産者了,再來一個就會報錯Producer with name 'produce-demo1' is already connected to topic
  是以如果我們是叢集部署的話,尤其注意每一個節點生産者的命名
  當然對于消費者也是同樣的規則,不允許名稱重複(在下文我也會示範到)           
/**
 * 示範生産者名稱重複,發送報錯
 */
@Test
public void testProducer1() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("通路模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "發送消息時間" + new Date());
    System.in.read();
}

/**
 * 示範生産者名稱重複,發送報錯
 */
@Test
public void testProducer11() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("通路模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "發送消息時間" + new Date());
    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Exclusive

要求生産者以獨占模式通路 Topic,在此模式下如果 Topic已經有了生産者,那麼其他生産者在連接配接就會失敗報錯。            
/**
 * Exclusive 要求生産者以獨占模式通路 Topic,在此模式下 如果 Topic 已經有了生産者,那麼其他生産者在連接配接就會失敗報錯。
 * <p>
 * "Topic has an existing exclusive producer: standalone-0-12
 */
@Test
public void testProducer6() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("通路模式-Exclusive")
            //設定通路模式 預設shared
            .accessMode(ProducerAccessMode.Exclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "發送消息時間" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .topic("通路模式-Exclusive")
            //設定通路模式 預設shared
            .accessMode(ProducerAccessMode.Exclusive)
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生産者名稱不能重複
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "發送消息時間" + new Date());

    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

WaitForExclusive

如果主題已經連接配接了生産者,則将目前生産者挂起,直到生産者獲得了Exclusive 通路權限。
該怎麼來了解這句話,打個不恰當比喻,類似于Java中的獨占鎖Sycronized一樣,你沒有擷取到鎖,沒有擷取到權限,就不能發消息,
對比Exclusive報錯來說,WaitForExclusive是不會報錯的,隻會是挂起,
來看下面的demo感受下

1 我們先開啟一個線程A向 通路模式-WaitForExclusive topic發送一條消息,My message 1 ***           
/**
 * WaitForExclusive
 * <p>
 * 如果主題已經連接配接了生産者,則将目前生産者挂起,直到生産者獲得了 Exclusive 通路權限。
 * <p>
 * 也就是存在相同的生産者,不會報錯,當然也不會發送消息,     擷取到獨占後,會将未擷取到獨占時的消息進行發送!!!
 */
@Test
public void testProducer2() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("通路模式-WaitForExclusive")
            //設定通路模式 預設shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "發送消息時間" + new Date());
    System.in.read();
}           
2 然後再開啟另一個線程B向 通路模式-WaitForExclusive topic發送10條消息,My message 2 ***           
/**
 * WaitForExclusive
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("通路模式-WaitForExclusive")
            //設定通路模式 預設shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    //假設有10條消息在未擷取 獨占前,均未被發送,模拟來看一下,擷取獨占後, 這10條消息會進行發送嗎 ? 會
    for (int i = 0; i < 10; i++) {
        stringProducer.send("My message 2 " + "發送消息時間" + new Date());
    }
    System.in.read();
}           
3 然後寫個簡單的消費者看一下消費情況           
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("通路模式-WaitForExclusive")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe()
    System.in.read();
}           
4 會看到消費者隻消費到了 線程A發送的消息,線程B的消息未被消費,因為此時topic的獨占權還線上程池A上           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範
5 手動殺死線程A,然後看消費者情況,會看到開始消費出My message 2 *** 也就是線程B的消息,
因為此時線程A被殺死,線程B得到了獨占權,線程B将消息發送出去            
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Consumer詳解

建立方式

簡單方法建立

可以看到寫了一個while true去擷取消息,對于線城是阻塞不友好的,是以我一般用第二種,監聽器方法           
/**
 * 建立消費者
 */
@Test
public void testConsumer22() throws Exception{
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscribe();
    while (true) {
        // Wait for a message
        Message msg = consumer.receive();
        try {
            // Do something with the message
            System.out.println("Message received: " + new String(msg.getData()));
            // Acknowledge the message so that it can be deleted by the message broker
            consumer.acknowledge(msg);
        } catch (Exception e) {
            // Message failed to process, redeliver later
            consumer.negativeAcknowledge(msg);
        }
    }
}           

監聽器方法建立

/**
 * 接收消息:異步 不阻塞主線程
 */
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("監聽器方式,不阻塞線程");
    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

loadConf自定義配置建立

更多自定義的配置可以看下官網檔案           
/**
 * loadConf建立消費者
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Map<String, Object> config1 = new HashMap<>();
    config1.put("subscriptionName", "consumer-demo1");
    config1.put("topicNames", Arrays.asList(new String[]{"my-topic"}));

    Consumer consumer = client
            .newConsumer()
            .loadConf(config1)
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("loadConf方式");
    System.in.read();
}           

多主題訂閱

多主題訂閱主要是指一個消費者,可以訂閱多個topic,這裡我隻示範其中兩個           

傳入List數組的多主題訂閱

/**
 * Multi-topic subscriptions
 * 多主題訂閱
 * 多topic 訂閱list設定的topic1 topic2
 */
@Test
public void testConsumer3() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };

    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-3");
    List<String> topics = Arrays.asList(
            "topic1",
            "topic2"
    );
    Consumer multiTopicConsumer = consumerBuilder
            .topics(topics)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

正規表達式多主題訂閱

簡單點就是正規表達式比對,根據業務需要自行設定表達式,這裡不多示範           
/**
 * Multi-topic subscriptions
 * 多主題訂閱
 * 正規表達式,訂閱所有以1結束的topic
 *
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-1");
    Pattern allTopicsInNamespace = Pattern.compile("public/default/.*1");
    Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(allTopicsInNamespace)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}           

消費模式

Exclusive(預設)

這裡需要注意的是同一topic主題上隻能有一個具有相同訂閱名稱的使用者 預設,也就是說 如果後端是叢集部署的話,請注意預設情況下subscriptionName的命名情況,否則會報錯           
/**
 * Exclusive 模式  也是預設的
 * 同一主題上隻能有一個具有相同訂閱名稱的使用者 預設
 * 否則會啟動報錯
 */
@Test
public void testConsumerExclusive() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Failover

這個主要是失敗轉移,對比Exclusive模式,同一主題上可以有具有相同訂閱名稱的使用者,也就是subscriptionName可以重複,一個節點挂掉了 剩餘消息轉移到另一個節點繼續消費;
 這塊的業務場景挺不錯的,假設我們背景有兩台叢集部署機器A,B,并且subscriptionName相同,
 正常情況下,其他子產品往隊列仍了一條消息,但是隻希望被其中一台機器消費, 一條消息被消費一次,而不是A,B兩機器都消費對吧,正常的幂等性操作
 現在開始模拟,假設其他子產品發送了10條消息,然後隻被其中一台消費           
@Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
        // 這裡的key可以類似于 投遞到 不同broker的一個辨別
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
           // a++;
          //  if (a > 4) {
               // System.out.println("模拟節點1故障");
                //關閉節點1
              //  consumer.close();
              //  throw new RuntimeException("模拟某時刻節點1故障,轉移至節點2消費");
           // }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範
再來看看失敗轉移,假設其中一台機器當機,然後我希望剩下機器B,繼續消費未消費完的消息,可以看到一台機器模拟當機後,另一台機器繼續消費,也就是失敗轉移           
/**
 * Failover故障轉移 .subscriptionName("my-subscription") 可重複
 * 一個節點挂掉了 剩餘消息轉移到另一個節點繼續消費
 * 注意這些消費模式 都是和subscriptionName("my-subscription") 訂閱者名稱相關
 */
int a = 0;

@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            a++;
            if (a > 4) {
                System.out.println("模拟節點1故障");
                //關閉節點1
                consumer.close();
                throw new RuntimeException("模拟某時刻節點1故障,轉移至節點2消費");
            }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            //consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Shared

多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證
           
/**
 * Shared模式
 * 多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證。
 * 也就是消費者 1 消費者2 總共消費10條
 * 注意都是從 .subscriptionName("my-subscription") 視角
 */
@Test
public void testShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

Key_Shared模式

這個簡單來了解,發送消息的時候,給這批消息指定一個key,那麼消息被消費的時候,相同key的這批消息,隻能被同一個節點消費
如下示例我發送消息時,指定下key,然後寫消費者看下消費情況,會看到key相同的消息被同一節點消費
           
@Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
/**
 * Key_Shared模式
 * 多個使用者将能夠使用相同的訂閱名稱,并且消息将根據連接配接的使用者之間的循環旋轉進行分派。 在這種模式下,消費順序不能保證。
 * 也就是消費者 1 消費者2 總共消費10條
 * 注意都是從 .subscriptionName("my-subscription") 視角
 * <p>
 * 具有相同密鑰的消息僅按順序傳遞給一個消費者。消息在不同消費者之間的可能分布(預設情況下,我們事先不知道哪些密鑰将被配置設定給消費者,但一個密鑰隻會同時被配置設定給消費者
 * ("key-1", "message-1-1")
 * ("key-1", "message-1-2")
 * ("key-1", "message-1-3")
 * ("key-3", "message-3-1")
 * ("key-3", "message-3-2")
 * <p>
 * <p>
 * ("key-2", "message-2-1")
 * ("key-2", "message-2-2")
 * ("key-2", "message-2-3")
 * ("key-4", "message-4-1")
 * ("key-4", "message-4-2")
 */
@Test
public void testKeyShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
    
           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

模式對比

Exclusive隻支援同一topic隻能有一個同名訂閱者,對于目前大多叢集架構,需要每個節點命名subscriptionName不同操作下,
 叢集中的每個節點都能收到topic消息,對于特殊場景如 前端websocket連接配接背景叢集這類場景,還是蠻實用
 Failover:可以保證在叢集中消息隻被消費一次,幂等性嘛簡單點,正常情況下隻被其中一台機器消費,也就是固定一台機器,這種就很紗布了
 Shared: 可以保證在叢集中消息隻被消費一次,也是保證了幂等性,而且消息被叢集平均消費了,壓力down down
 Key_Shared 我再想想
 
           

Topic

Pulsar對topic的命名有如下規則,
{persistent|non-persistent}://tenant/namespace/topic
           
  • persistent / non-persistent 表示主題的類型,主題分為持久化和非持久化主題,預設是持久化的類型。持久化的主題會将消息儲存到磁盤上,而非持久化的主題就不會将消息儲存到磁盤。
  • tenant Pulsar 中主題的租戶,租戶對于 Pulsar中的多租戶至關重要,并且分布在叢集中。
  • namespace 将相關聯的 Topic 作為一個組來管理,是管理 Topic 的基本單元。每個租戶可以有一個或多個命名空間。
在上面的示例,我們都沒有去關注persistent,tenant,namespace的玩法,因為你不去特殊設定的話,pulsar都有預設的
           
我們可以嘗試往persistent://sample/namespace_test4/topic-haha1直接發一條消息,你會發現發送報錯Policies not found for sample/namespace_test4 namespace
           
/**
 * 報錯
 * 向租戶sample 命名空間 namespace_test4  topic topic-haha1 發送消息
 * 注意namespace需手動先建立好,否則會報錯 olicies not found for sample/namespace_test4 namespace
 */
@Test
public void testProduce322() throws Exception {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("persistent://sample/namespace_test4/topic-haha1")
            .enableBatching(false)
            .create();
    producer.send("向租戶sample 命名空間 namespace_test2  topic topic-haha1 發送消息");
    System.in.read();
}
           
消息隊列Pulsar入門(一) 生産者/消費者/Topic詳解,附源碼示範

這裡則表示我們需要先建立namespace之後,搞好對應的namespace tenant這些之後才行

那麼如何動态去建立namespace,管理tenanat,以及包括我們剛才搞了那麼多的生産者消費者測試出來,

能不能有一個UI界面讓我一目了然,一手掌握Pulsar呢?

這裡我即将介紹Pulsar的一款UI工具Pulsar admin

敬請期待,持續更新

連結:https://juejin.cn/post/7220715429244469308