天天看点

[SpringCloudAlibaba]RocketMq快速入门之Api使用

作者:Java合集

前言

虽然现在是SpringBoot开发统领的时代,但还是要学习原始的Api使用,一方面有助于加深对RocketMq的理解,另一方面方便在出现问题时调试代码。(Template谁不会用?对吧?)

Start

Dependency

<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq-client</artifactId> 
    <version>4.7.1</version>
</dependency> 
复制代码           

同步消息

同步消息:生产者发送消息之后,必须等待broker返回信息之后才继续业务逻辑,在broker返回信息之前,生产者保持阻塞等待状态

生产者

public class SyncProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", "TagA", ("Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.printf("send %s", new String(message.getBody()));
            System.out.println();
            System.out.printf("get result %s", sendResult.toString());
        }
        producer.shutdown();
    }
}
复制代码           

消费者

public class SyncConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive Messages : %s\n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}
复制代码           

异步消息

异步消息:生产者提供消息之后,不需要等待broker反馈即可继续执行业务逻辑,仅需要提供一个回调函数

public class AsyncProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group02");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        producer.setRetryTimesWhenSendFailed(0);
        int count = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(count);
        for (int i = 0; i < count; ++i) {
            try {
                Message message = new Message("TopicTest", "TagA", "key", ("hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                final int index = i;
                producer.send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%s get success result for message : %d\n", Thread.currentThread().getName(), index);
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%s get error result for message : %d\n", Thread.currentThread().getName(), index);
                    }
                });
            } catch (UnsupportedEncodingException | RemotingException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}
复制代码           

单向消息

不需要等待broker反馈,也不需要回调

public class OneWayProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", "TagA", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(message);
        }
        Thread.sleep(5000);
        producer.shutdown();
    }
}
复制代码           

顺序消息

局部顺序消息

:指消费者消费某个topic中的某个队列时是顺序消费。使用MessageListenerOrderly实现.

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("receive message : " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start");
    }
}
复制代码           

全局顺序消息

消费者消费全部消息都是顺序的,只能通过某topic只有一个队列实现,使用场景少,性能差。

乱序消费

使用MessageListenerConcurrently实现,我们在上面同步消息已经演示过

广播消息

消费者

public class BroadcastConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("192.168.23.128:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt message : msgs) {
                    System.out.println("receive message : " + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
复制代码           

生产者

public class BroadcastProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.setNamesrvAddr("192.168.23.128:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderId", ("hello " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(message);
        }
        producer.shutdown();
    }
}
复制代码           

延迟消息

生产者

public class ScheduledProducer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.start();
        for (int i = 0; i < 100; ++i) {
            Message message = new Message("TopicTest", ("hello " + i).getBytes(StandardCharsets.UTF_8));
            message.setDelayTimeLevel(3);
            producer.send(message);
        }
        producer.shutdown();
    }
}
复制代码           

消费者

public class ScheduledConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt message : msgs) {
                    // TODO: 消费消息
                    
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
复制代码           

RocketMQ设计了18个延迟等级

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

批量消息

发送一个Message集合

public class BatchProducer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
        producer.start();
        String topic = "TopicTest";
        List<Message> messages = new LinkedList<>();
        messages.add(new Message(topic, "TagA", "Order01", "order01".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order02", "order02".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order03", "order03".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic, "TagA", "Order04", "order04".getBytes(StandardCharsets.UTF_8)));
        producer.send(messages);
        producer.shutdown();
    }
}
复制代码           

但是,我们之前在部署broker时,会注意到一个参数,maxMessageSize=4194304,这是批量消息的最大大小。如果超过这个值,应采用分批发送.

事务消息

: 事务消息确保本地事务的执行和消息的发送可以原子地进行。 事务消息三种状态:

  1. TransactionStatus.CommitTransaction:事务提交,允许消费
  2. TransactionStatus.RollbackTransaction:事务回滚,消息删除
  3. TransactionStatus.Unknown:需要MQ回查确定状态

事务消息流程

[SpringCloudAlibaba]RocketMq快速入门之Api使用

生产者

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagA")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.contains(tags, "TagB")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagC")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.contains(tags, "TagD")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("producer-group01");
        producer.setNamespace("192.168.23.128:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("hello " + i).getBytes());
            producer.sendMessageInTransaction(message, null);
        }
        producer.shutdown();
    }
}
复制代码           

注意事项

  • 事务消息没有调度和批处理主持
  • 为避免单条消息被检查次数过多,导致半消息堆积,我们默认将单条消息的检查次数限制为15次,可以通过transactionCheckMax更改。一旦超过,broker会丢弃该消息,并给出错误日志,可以通过重写AbstractTransactionCheckListener来定制
  • 事务消息在一定时间后检查,由transactionTimeout确定,也可以在发送事务消息时设置CHECK_IMMUNITY_TIME_IN_SECONDS的用户属性来改变。
  • 一个事务消息可能会被检查或者消费不止一次
  • 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。 ⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不 丢失,保证事务完整性,推荐使⽤同步双写机制。
  • 事务性消息的生产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型 的消息不同,事务性消息允许向后查询。MQ 服务器通过其生产者 ID 查询客 户端