天天看點

5.RocketMQ 用戶端開發1. 引入依賴2. 發送消息3. Push方式消費消息4. Pull方式消費消息

文章目錄

  • 1. 引入依賴
  • 2. 發送消息
    • 2.1 發送同步消息
    • 2.2 發送異步消息
    • 2.3 發送單向消息
    • 2.4 發送順序消息
  • 3. Push方式消費消息
  • 4. Pull方式消費消息

1. 引入依賴

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
           
注意,這兒的jar包的版本号必須和rocket的版本号完全一緻,否則可能出現各種意向不到的問題,如果發現例子無法正常運作,請優先檢查版本号

2. 發送消息

2.1 發送同步消息

//使用組名來初始化一個生産者
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定Server Name 位址.
    producer.setNamesrvAddr("localhost:9876");
    //啟動執行個體
    producer.start();
    for (int i = 0; i < 100; i++) {
        //建立消息執行個體,指定 topic, tag 和 消息内容
        byte[] msgBytes =  ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA" ,msgBytes);
        // 發送消息并擷取發送結果
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }
    //生産者執行個體不在使用時進行關閉
    producer.shutdown();
           

2.2 發送異步消息

吐槽官網,官網的例子基本無法運作,因為 在異步等待響應結果的時候,生産者執行個體被關閉了。。解決方案 注釋掉不關閉

// 使用組名執行個體化生産者。
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定NameServer 位址
    producer.setNamesrvAddr("localhost:9876");
    // 啟動執行個體
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
        final int index = i;
        //建立消息執行個體,指定 topic, tag 和 消息内容.
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA","OrderID188",msgBytes);
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.out.printf("%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });
    }

    // 生産者執行個體不在使用時進行關閉.
    // producer.shutdown();
           

2.3 發送單向消息

// 使用組名執行個體化生産者 .
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定NameServer 位址.
    producer.setNamesrvAddr("localhost:9876");
    // 啟動執行個體.
    producer.start();
    for (int i = 0; i < 100; i++) {
        // 建立消息執行個體,指定 topic, tag 和 消息内容.
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA",msgBytes );
        // 發送消息.
        producer.sendOneway(msg);

    }
    // 生産者執行個體不在使用時進行關閉.
    producer.shutdown();
    
           

2.4 發送順序消息

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,msgBytes);
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("%s%n", sendResult);
    }

    producer.shutdown();
           

3. Push方式消費消息

// 用組名執行個體化消費者.
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    // 指定NameServer 位址.
    consumer.setNamesrvAddr("localhost:9876");

    // 訂閱一個Topic.
    consumer.subscribe("TopicTest", "*");
    // 注冊回掉函數,從Broker中取到消息時會調用.
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    // 啟動消費者執行個體.
    consumer.start();
    System.out.printf("Consumer Started.%n");
           

4. Pull方式消費消息

// 用組名執行個體化消費者.
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("sumanGroup");

    // 指定NameServer 位址.
    consumer.setNamesrvAddr("localhost:9876");
    consumer.start();

    Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
    MessageQueue[] messageQueueArr = messageQueues.toArray(new MessageQueue[]{});
    long offset = consumer.fetchConsumeOffset(messageQueueArr[0],true);
    System.out.println(offset);
    PullResult pull = consumer.pullBlockIfNotFound(messageQueueArr[0], "*", offset, 5);
    pull.getMsgFoundList().forEach(item->System.out.println(new String(item.getBody())));
    consumer.updateConsumeOffset(messageQueueArr[0],pull.getNextBeginOffset());
    consumer.getOffsetStore().persist(messageQueueArr[0]);

    /* 啟動消費者執行個體. */


    System.out.printf("Consumer Started.%n");