天天看点

5.RocketMQ 客户端开发1. 引入依赖2. 发送消息3. Push方式消费消息4. Pull方式消费消息

文章目录

  • 1. 引入依赖
  • 2. 发送消息
    • 2.1 发送同步消息
    • 2.2 发送异步消息
    • 2.3 发送单向消息
    • 2.4 发送顺序消息
  • 3. Push方式消费消息
  • 4. Pull方式消费消息

1. 引入依赖

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
           
注意,这儿的jar包的版本号必须和rocket的版本号完全一致,否则可能出现各种意向不到的问题,如果发现例子无法正常运行,请优先检查版本号

2. 发送消息

2.1 发送同步消息

//使用组名来初始化一个生产者
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定Server Name 地址.
    producer.setNamesrvAddr("localhost:9876");
    //启动实例
    producer.start();
    for (int i = 0; i < 100; i++) {
        //创建消息实例,指定 topic, tag 和 消息内容
        byte[] msgBytes =  ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA" ,msgBytes);
        // 发送消息并获取发送结果
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }
    //生产者实例不在使用时进行关闭
    producer.shutdown();
           

2.2 发送异步消息

吐槽官网,官网的例子基本无法运行,因为 在异步等待响应结果的时候,生产者实例被关闭了。。解决方案 注释掉不关闭

// 使用组名实例化生产者。
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定NameServer 地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
        final int index = i;
        //创建消息实例,指定 topic, tag 和 消息内容.
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA","OrderID188",msgBytes);
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.out.printf("%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });
    }

    // 生产者实例不在使用时进行关闭.
    // producer.shutdown();
           

2.3 发送单向消息

// 使用组名实例化生产者 .
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 指定NameServer 地址.
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例.
    producer.start();
    for (int i = 0; i < 100; i++) {
        // 创建消息实例,指定 topic, tag 和 消息内容.
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest","TagA",msgBytes );
        // 发送消息.
        producer.sendOneway(msg);

    }
    // 生产者实例不在使用时进行关闭.
    producer.shutdown();
    
           

2.4 发送顺序消息

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        byte[] msgBytes =  ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,msgBytes);
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("%s%n", sendResult);
    }

    producer.shutdown();
           

3. Push方式消费消息

// 用组名实例化消费者.
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    // 指定NameServer 地址.
    consumer.setNamesrvAddr("localhost:9876");

    // 订阅一个Topic.
    consumer.subscribe("TopicTest", "*");
    // 注册回掉函数,从Broker中取到消息时会调用.
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    // 启动消费者实例.
    consumer.start();
    System.out.printf("Consumer Started.%n");
           

4. Pull方式消费消息

// 用组名实例化消费者.
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("sumanGroup");

    // 指定NameServer 地址.
    consumer.setNamesrvAddr("localhost:9876");
    consumer.start();

    Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
    MessageQueue[] messageQueueArr = messageQueues.toArray(new MessageQueue[]{});
    long offset = consumer.fetchConsumeOffset(messageQueueArr[0],true);
    System.out.println(offset);
    PullResult pull = consumer.pullBlockIfNotFound(messageQueueArr[0], "*", offset, 5);
    pull.getMsgFoundList().forEach(item->System.out.println(new String(item.getBody())));
    consumer.updateConsumeOffset(messageQueueArr[0],pull.getNextBeginOffset());
    consumer.getOffsetStore().persist(messageQueueArr[0]);

    /* 启动消费者实例. */


    System.out.printf("Consumer Started.%n");