目录
Subscribe
Publish
Pubsub
Unaubscribe
Psubscribe
Punsubscribe
Java实现发布订阅
发布者
订阅者
测试类
toBePublisher
toBeSubscriber
Subscribe
参数格式:subscribe [channel,频道名称]
Publish
参数格式:publis [channel,频道名称] [message]
Pubsub
pubsub channels :查看活跃的订阅频道
Unaubscribe
参数格式:unaubscribe [channel,频道名称]
Psubscribe
Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。举例:每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
psubscribe my*
Punsubscribe
退订指定个地频道,参考Psubscribe和Unsubscribe。
Java实现发布订阅
jedisPool类就不贴了,核心入下,具体见本系列JedisPool连接池部分
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWaitMillis(MAX_WAIT);
jedisPool = new JedisPool(config, HOST, PORT, TIMEOUT);
发布者
public class Publisher implements Runnable {
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
// TODO Auto-generated method stub
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));System.err.println("is running");
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
System.err.println("输入一次");
jedis.publish("broadcast_channel", line); //从 mychannel 的频道上推送消息
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
订阅者
为了美观只重写三个方法
/*
监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
* */
public class Subscriber extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
// TODO Auto-generated method stub
System.err.println("收到来自" + channel + "的新消息:" + message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
System.err.println("收到来自" + channel + "的新消息:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
System.err.println("恭喜你订阅:" + channel + "成功!");
}
}
测试类
public class Doit {
public void toBePublisher() throws InterruptedException {
String channel = "broadcast_channel";
Jedis jedis = new Jedis("localhost");
JedisPool jedisPool = JedisPoolDemo.initJedisPool();
Publisher publisher = new Publisher(jedisPool);
publisher.run();
// jedis.publish(channel, "hello world , broadcast_channel");
}
public void toBeSubscriber() {
String channel = "broadcast_channel";
Jedis jedis = new Jedis("localhost");
JedisPool jedisPool = JedisPoolDemo.initJedisPool();
Subscriber subscriber = new Subscriber();
jedis.subscribe(subscriber, channel);
}
public static void main(String[] args) throws InterruptedException {
new Doit().toBeSubscriber();
}
}
toBePublisher
toBeSubscriber
注意到返回(integer2)为两个订阅者:我额外开了一个命令窗口订阅挂着。
😘如果对你有所帮助点个赞,给点小动力,谢谢啦~