天天看点

Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅

目录

Subscribe

Publish

Pubsub

Unaubscribe

Psubscribe  

Punsubscribe

Java实现发布订阅

发布者

订阅者

测试类

toBePublisher

toBeSubscriber

Subscribe

参数格式:subscribe [channel,频道名称]
Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅

Publish

参数格式:publis [channel,频道名称] [message]

Pubsub

pubsub channels :查看活跃的订阅频道 

Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅

Unaubscribe

参数格式:unaubscribe [channel,频道名称]

Psubscribe  

Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。举例:每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。

psubscribe my*
           
Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅

Punsubscribe

退订指定个地频道,参考Psubscribe和Unsubscribe。

Java实现发布订阅

jedisPool类就不贴了,核心入下,具体见本系列JedisPool连接池部分

JedisPoolConfig config = new JedisPoolConfig();  
			 config.setMaxTotal(MAX_ACTIVE);  
			 config.setMaxIdle(MAX_IDLE);  
			 config.setMaxWaitMillis(MAX_WAIT);  
			 jedisPool = new JedisPool(config, HOST, PORT, TIMEOUT); 
           

发布者

public class Publisher implements Runnable {

	private final JedisPool jedisPool;

	public Publisher(JedisPool jedisPool) {
		this.jedisPool = jedisPool;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));System.err.println("is running");
        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                	System.err.println("输入一次");
                    jedis.publish("broadcast_channel", line);   //从 mychannel 的频道上推送消息
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
	}

}
           

订阅者

为了美观只重写三个方法

/*
 监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
 * */

public class Subscriber extends JedisPubSub{

	@Override
	public void onMessage(String channel, String message) {
		// TODO Auto-generated method stub
		System.err.println("收到来自" + channel + "的新消息:" + message);
	}

	@Override
	public void onPMessage(String pattern, String channel, String message) {
		// TODO Auto-generated method stub
		System.err.println("收到来自" + channel + "的新消息:" + message);
	}

	@Override
	public void onSubscribe(String channel, int subscribedChannels) {
		// TODO Auto-generated method stub
		System.err.println("恭喜你订阅:" + channel + "成功!");
	}

}
           

测试类

public class Doit {

	public void toBePublisher() throws InterruptedException {
		String channel = "broadcast_channel";
		Jedis jedis = new Jedis("localhost");
		JedisPool jedisPool = JedisPoolDemo.initJedisPool();

		Publisher publisher = new Publisher(jedisPool);
		publisher.run();

		// jedis.publish(channel, "hello world , broadcast_channel");
	}

	public void toBeSubscriber() {
		String channel = "broadcast_channel";
		Jedis jedis = new Jedis("localhost");
		JedisPool jedisPool = JedisPoolDemo.initJedisPool();

		Subscriber subscriber = new Subscriber();
		jedis.subscribe(subscriber, channel);
	}

	public static void main(String[] args) throws InterruptedException {
		new Doit().toBeSubscriber();
	}
}
           

toBePublisher

Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅

toBeSubscriber

注意到返回(integer2)为两个订阅者:我额外开了一个命令窗口订阅挂着。

Redis(十)订阅和发布(sub/pub)以及Java实现代码SubscribePublishPubsubUnaubscribePsubscribe  PunsubscribeJava实现发布订阅
😘如果对你有所帮助点个赞,给点小动力,谢谢啦~ 

继续阅读