概述
官方推薦的是一個系統就用一個topic,然後系統下的類目就用tag來區分,比如說訂單系統,整個訂單就放到一個topic下,是下單還是支付還是退款等等發消息的時候通過不同的tag來區分.
broker再推送消息給consumer的時候就提前通過tag來判斷是否推送到這個consumer了,如果你這個consumer沒配置這個tag,那麼broker就不會把這個tag推送給你.這樣的好處就可以減少網絡IO.
SQL92文法
RocketMQ隻定義了一些基本文法來支援這個特性。你也可以很容易地擴充它。
- 數值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字元比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符号 AND,OR,NOT;
常量支援類型為:
- 數值,比如:123,3.1415;
- 字元,比如:‘abc’,必須用單引号包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
使用注意!
隻有推模式的消費者可以使用SQL過濾。拉模式是用不了的。
代碼案例
生産者
package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
//發送的時候設定tag
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消費者
package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.IOException;
import java.util.List;
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
/* TagA || TagC ,意思是 隻有 TagA 或者 TagC 的消息會被接收 */
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
sql過濾
“TagA || TagC” 的寫法一個消息隻能有一個tag,如果想用多個tag來過濾的話,"TagA || TagC"這樣的寫法就做不到了.是以就使用sql過濾.
代碼示範
Linux配置
在broker.properties那裡配置,配置完了需要重新開機Broker叢集
#是否支援根據屬性過濾。如果使用基于标準的sql92模式過濾消息則改參數必須設定為true。
enablePropertyFilter=true
生産者
package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
// 發送消息的時候指定tag
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 設定使用者屬性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消費者
package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
//sql選擇器
/*下面的sql選擇器是 tags不為空,并且tags 是 'TagA' 或者'TagB' ,
并且MQ的UserProperty裡面的a屬性不能是null 并且 a屬性必須在 0~3範圍 , 包括0和3
*/
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}