天天看點

使用jedis實作Redis消息隊列(MQ)的釋出(publish)和消息監聽(subscribe)前言:1、jedis的消息隊列方法簡述2、釋出消息3、監聽消息4、測試消息隊列釋出和監聽

前言:

本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis連接配接池需要依賴commons-pool2包,json包用于對象執行個體和json字元串的互相轉換

1、jedis的消息隊列方法簡述

1.1、釋出消息方法

(其中,channel是對應消息通道,message是對應消息體)

jedis.publish(channel, message);

1.2、監聽消息方法

(其中,jedisPubSub用于處理監聽到的消息,channels是對應的通道)

jedis.subscribe(jedisPubSub, channels);

2、釋出消息

/**
	 * 從jedis連接配接池擷取jedis操作執行個體
	 * @return
	 */
	public static Jedis getJedis() {
		return RedisPoolManager.getJedis();
	}

	/**
	 * 推入消息到redis消息通道
	 * 
	 * @param String
	 *            channel
	 * @param String
	 *            message
	 */
	public static void publish(String channel, String message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}
	}

	/**
	 * 推入消息到redis消息通道
	 * 
	 * @param byte[]
	 *            channel
	 * @param byte[]
	 *            message
	 */
	public void publish(byte[] channel, byte[] message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}

	}
           

3、監聽消息

3.1、監聽消息主體方法

/**
	 * 監聽消息通道
	 * @param jedisPubSub - 監聽任務
	 * @param channels - 要監聽的消息通道
	 */
	public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}

	/**
	 * 監聽消息通道
	 * @param jedisPubSub - 監聽任務
	 * @param channels - 要監聽的消息通道
	 */
	public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}
           

3.2、處理監聽到的消息任務

class Tasker implements Runnable {
	private String[] channel = null;//監聽的消息通道
	private JedisPubSub jedisPubSub = null;//消息處理任務

	public Tasker(JedisPubSub jedisPubSub, String ...channel) {
		this.jedisPubSub = jedisPubSub;
		this.channel = channel;
	}

	@Override
	public void run() {
		// 監聽channel通道的消息
		RedisMQ.subscribe(jedisPubSub, channel);
	}

}
           

3.3、處理監聽到的消息主體類實作

package cn.eguid.livePushServer.redisManager;

import java.util.Map;

import org.json.JSONObject;

import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;

public class RedisMQHandler extends JedisPubSub{
	PushManager pushManager = null;

	public RedisMQHandler(PushManager pushManager) {
		super();
		this.pushManager = pushManager;
	}

	@Override
	// 接收到消息後進行分發執行
	public void onMessage(String channel, String message) {
		JSONObject jsonObj = new JSONObject(message);
		System.out.println(channel+","+message);
		if ("push".equals(channel)) {
			Map<String,Object> map=jsonObj.toMap();
			System.out.println("接收到一條推流消息,準備推流:"+map);
//			String appName=pushManager.push(map);
			//推流完成後還需要釋出一個成功消息到傳回隊列
			
		} else if ("close".equals(channel)) {
			String appName=jsonObj.getString("appName");
			System.out.println("接收到一條關閉消息,準備關閉應用:"+appName);
//			pushManager.closePush(appName);
		}
	}
}
           

4、測試消息隊列釋出和監聽

public static void main(String[] args) throws InterruptedException {
		
		PushManager pushManager= new PushManagerImpl();
		Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
		Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
		t1.start();
		t2.start();
		
		LivePushEntity livePushInfo=new LivePushEntity();
		livePushInfo.setAppName("test1");
		JSONObject json=new JSONObject(livePushInfo);
		publish("push",json.toString());
		publish("close", json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());

	}