天天看點

[SpringCloudAlibaba]RocketMq快速入門之Api使用

作者:Java合集

前言

雖然現在是SpringBoot開發統領的時代,但還是要學習原始的Api使用,一方面有助于加深對RocketMq的了解,另一方面友善在出現問題時調試代碼。(Template誰不會用?對吧?)

Start

Dependency

<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq-client</artifactId> 
    <version>4.7.1</version>
</dependency> 
複制代碼           

同步消息

同步消息:生産者發送消息之後,必須等待broker傳回資訊之後才繼續業務邏輯,在broker傳回資訊之前,生産者保持阻塞等待狀态

生産者

public class SyncProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", "TagA", ("Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.printf("send %s", new String(message.getBody()));
            System.out.println();
            System.out.printf("get result %s", sendResult.toString());
        }
        producer.shutdown();
    }
}
複制代碼           

消費者

public class SyncConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive Messages : %s\n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}
複制代碼           

異步消息

異步消息:生産者提供消息之後,不需要等待broker回報即可繼續執行業務邏輯,僅需要提供一個回調函數

public class AsyncProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group02");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        producer.setRetryTimesWhenSendFailed(0);
        int count = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(count);
        for (int i = 0; i < count; ++i) {
            try {
                Message message = new Message("TopicTest", "TagA", "key", ("hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                final int index = i;
                producer.send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%s get success result for message : %d\n", Thread.currentThread().getName(), index);
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%s get error result for message : %d\n", Thread.currentThread().getName(), index);
                    }
                });
            } catch (UnsupportedEncodingException | RemotingException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}
複制代碼           

單向消息

不需要等待broker回報,也不需要回調

public class OneWayProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", "TagA", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(message);
        }
        Thread.sleep(5000);
        producer.shutdown();
    }
}
複制代碼           

順序消息

局部順序消息

:指消費者消費某個topic中的某個隊列時是順序消費。使用MessageListenerOrderly實作.

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("receive message : " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start");
    }
}
複制代碼           

全局順序消息

消費者消費全部消息都是順序的,隻能通過某topic隻有一個隊列實作,使用場景少,性能差。

亂序消費

使用MessageListenerConcurrently實作,我們在上面同步消息已經示範過

廣播消息

消費者

public class BroadcastConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt message : msgs) {
                    System.out.println("receive message : " + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
複制代碼           

生産者

public class BroadcastProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderId", ("hello " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(message);
        }
        producer.shutdown();
    }
}
複制代碼           

延遲消息

生産者

public class ScheduledProducer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", ("hello " + i).getBytes(StandardCharsets.UTF_8));
            message.setDelayTimeLevel(3);
            producer.send(message);
        }
        producer.shutdown();
    }
}
複制代碼           

消費者

public class ScheduledConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt message : msgs) {
                    // TODO: 消費消息
                    
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
複制代碼           

RocketMQ設計了18個延遲等級

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

批量消息

發送一個Message集合

public class BatchProducer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.start();
        String topic = "TopicTest";
        List<Message> messages = new LinkedList<>();
        messages.add(new Message(topic, "TagA", "Order01", "order01".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order02", "order02".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order03", "order03".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order04", "order04".getBytes(StandardCharsets.UTF_8)));
        producer.send(messages);
        producer.shutdown();
    }
}
複制代碼           

但是,我們之前在部署broker時,會注意到一個參數,maxMessageSize=4194304,這是批量消息的最大大小。如果超過這個值,應采用分批發送.

事務消息

: 事務消息確定本地事務的執行和消息的發送可以原子地進行。 事務消息三種狀态:

  1. TransactionStatus.CommitTransaction:事務送出,允許消費
  2. TransactionStatus.RollbackTransaction:事務復原,消息删除
  3. TransactionStatus.Unknown:需要MQ回查确定狀态

事務消息流程

[SpringCloudAlibaba]RocketMq快速入門之Api使用

生産者

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagA")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.contains(tags, "TagB")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagC")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.contains(tags, "TagD")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("producer-group01");
        producer.setNamespace("192.168.23.128:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("hello " + i).getBytes());
            producer.sendMessageInTransaction(message, null);
        }
        producer.shutdown();
    }
}
複制代碼           

注意事項

  • 事務消息沒有排程和批處理主持
  • 為避免單條消息被檢查次數過多,導緻半消息堆積,我們預設将單條消息的檢查次數限制為15次,可以通過transactionCheckMax更改。一旦超過,broker會丢棄該消息,并給出錯誤日志,可以通過重寫AbstractTransactionCheckListener來定制
  • 事務消息在一定時間後檢查,由transactionTimeout确定,也可以在發送事務消息時設定CHECK_IMMUNITY_TIME_IN_SECONDS的使用者屬性來改變。
  • 一個事務消息可能會被檢查或者消費不止一次
  • 送出給⽤戶⽬标主題的消息reput可能會失敗。⽬前,它取決于⽇志記錄。 ⾼可⽤是由 RocketMQ 本身的⾼可⽤機制來保證的。如果要保證事務消息不 丢失,保證事務完整性,推薦使⽤同步雙寫機制。
  • 事務性消息的生産者 ID 不能與其他類型消息的⽣産者 ID 共享。與其他類型 的消息不同,事務性消息允許向後查詢。MQ 伺服器通過其生産者 ID 查詢客 戶端