天天看点

jedis实现订阅发布-publish/subscribe

本文由larrylgq编写,转载请注明出处:http://blog.csdn.net/larrylgq/article/details/7395261

作者:吕桂强

邮箱:[email protected]

本例包括

jedis_demo:入口类

jedis_control:jedis控制器(jedis的连接池)

jedis_pub_sub_listener:订阅的监听器

singleton_agent:单例的代理类(连接池配置)

package com.larry.jedis;

import redis.clients.jedis.Jedis;

/**
 * 入口类
 * @author 吕桂强
 * @email [email protected]
 * @version 创建时间:2012-3-28 下午12:12:41
 */
public class jedis_demo {
	jedis_control redis_util = jedis_control.get_singleton();
	
	public static void main(String[] args) {
		jedis_demo jedis_demo = new jedis_demo();
		
		new Thread(new Runnable(){
			@Override
			public void run() {
				jedis_control redis_util = jedis_control.get_singleton();
				Jedis jedis = redis_util.get_connection();
				jedis_pub_sub_listener pub_sub_listener = new jedis_pub_sub_listener();
				// 可以订阅多个频道
				// 订阅得到信息在lister的onMessage(...)方法中进行处理
				// jedis.subscribe(listener, "news.share", "news.log");
				// jedis.subscribe(listener, new String[]{"news.share","news.log"});
				jedis.psubscribe(pub_sub_listener, new String[] { "news.share" });// 使用模式匹配的方式设置频道
			}
		}).start();
		
		jedis_demo.publish();
	}

	/**
	 * 发布
	 */
	public void publish() {
		Jedis jedis = redis_util.get_connection();  
		jedis.publish("news.share", "ok");  
		jedis.publish("news.share", "hello word"); 
	}
}
           
package com.larry.jedis;

import redis.clients.jedis.Jedis;

/**
 * jedis控制器
 * @author 吕桂强
 * @email [email protected]
 * @version 创建时间:2012-3-28 下午12:03:40
 */
public final class jedis_control {
	//单例
	private static jedis_control _jedis_control;
	public static jedis_control get_singleton(){
		if(_jedis_control == null){
			_jedis_control = new jedis_control();
		}
		return _jedis_control;
	}
	
	/**      
	 * 获取连接实例       
	 * @return jedis       
	 */     
	public Jedis get_connection() {
		Jedis jedis = null;          
		try {              
			jedis = singleton_agent.get_jedispool().getResource();          
		} catch (Exception e) {              
			e.printStackTrace();          
		}          
		return jedis;      
	}   
	
	/**       
	 * 释放数据库连接       
	 * @param conn       
	 */     
	public void close_connection(Jedis jedis) {          
		if (null != jedis) {              
			try {                  
				singleton_agent.get_jedispool().returnResource(jedis);              
			} catch (Exception e) {
					e.printStackTrace();              
			}          
		}      
	}  
} 
           
package com.larry.jedis;

import redis.clients.jedis.JedisPubSub;

/**
 * 监听订阅事件
 * @author 吕桂强
 * @email [email protected]
 * @version 创建时间:2012-3-28 下午12:09:20
 */
public class jedis_pub_sub_listener extends JedisPubSub {
	// 取得订阅的消息后的处理
	public void onMessage(String channel, String message) {
		System.out.println(channel + "=" + message);
	}

	// 初始化订阅时候的处理
	public void onSubscribe(String channel, int subscribedChannels) {
		System.out.println(channel + "=" + subscribedChannels);
	}

	// 取消订阅时候的处理
	public void onUnsubscribe(String channel, int subscribedChannels) {
		System.out.println(channel + "=" + subscribedChannels);
	}

	// 初始化按表达式的方式订阅时候的处理
	public void onPSubscribe(String pattern, int subscribedChannels) {
		System.out.println(pattern + "=" + subscribedChannels);
	}

	// 取消按表达式的方式订阅时候的处理
	public void onPUnsubscribe(String pattern, int subscribedChannels) {
		System.out.println(pattern + "=" + subscribedChannels);
	}

	// 取得按表达式的方式订阅的消息后的处理
	public void onPMessage(String pattern, String channel, String message) {
		System.out.println(pattern + "=" + channel + "=" + message);
	}
}
           
package com.larry.jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * 所有单例的代理类
 * @author 吕桂强
 * @email [email protected]
 * @version 创建时间:2012-3-28 下午12:30:42
 */
public class singleton_agent {
	//****************单例一个连接池***************
	private static JedisPool jedispool = null;
	/**       
	 * 获取连接池       
	 * @return 数据源       
	 */     
	public static JedisPool get_jedispool() {
		if(jedispool == null){
			JedisPoolConfig jedispool_config = new JedisPoolConfig();
			jedispool_config.maxActive = 20;
			jedispool_config.maxIdle = 0;
			jedispool_config.maxWait = 1000;
			jedispool_config.testOnBorrow = true;
			jedispool = new JedisPool(jedispool_config, "localhost", 6379);
		}
		return jedispool;     
	}
	//end****************单例一个连接池***************
}