天天看點

RocketMQ的消息過濾功能概念和demo

概述

官方推薦的是一個系統就用一個topic,然後系統下的類目就用tag來區分,比如說訂單系統,整個訂單就放到一個topic下,是下單還是支付還是退款等等發消息的時候通過不同的tag來區分.

broker再推送消息給consumer的時候就提前通過tag來判斷是否推送到這個consumer了,如果你這個consumer沒配置這個tag,那麼broker就不會把這個tag推送給你.這樣的好處就可以減少網絡IO.

SQL92文法

RocketMQ隻定義了一些基本文法來支援這個特性。你也可以很容易地擴充它。

  • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
  • 字元比較,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符号 AND,OR,NOT;

常量支援類型為:

  • 數值,比如:123,3.1415;
  • 字元,比如:‘abc’,必須用單引号包裹起來;
  • NULL,特殊的常量
  • 布爾值,TRUE 或 FALSE

使用注意!

隻有推模式的消費者可以使用SQL過濾。拉模式是用不了的。

代碼案例

生産者

package org.apache.rocketmq.example.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class TagFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();
    //發送的時候設定tag
        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}      

消費者

package org.apache.rocketmq.example.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.IOException;
import java.util.List;

public class TagFilterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        /*  TagA || TagC  ,意思是 隻有 TagA 或者 TagC 的消息會被接收 */
        consumer.subscribe("TagFilterTest", "TagA || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}      

sql過濾

“TagA || TagC” 的寫法一個消息隻能有一個tag,如果想用多個tag來過濾的話,"TagA || TagC"這樣的寫法就做不到了.是以就使用sql過濾.

代碼示範

Linux配置

在broker.properties那裡配置,配置完了需要重新開機Broker叢集

#是否支援根據屬性過濾。如果使用基于标準的sql92模式過濾消息則改參數必須設定為true。 
enablePropertyFilter=true      

生産者

package org.apache.rocketmq.example.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SqlFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();
    
        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            // 發送消息的時候指定tag
            Message msg = new Message("SqlFilterTest",
                    tags[i % tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 設定使用者屬性
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}      

消費者

package org.apache.rocketmq.example.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");

        //sql選擇器
        /*下面的sql選擇器是 tags不為空,并且tags 是 'TagA' 或者'TagB' ,
        并且MQ的UserProperty裡面的a屬性不能是null 并且 a屬性必須在 0~3範圍 , 包括0和3
        */
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}      

檢視效果