前言:
本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar
其中jedis連接配接池需要依賴commons-pool2包,json包用于對象執行個體和json字元串的互相轉換
1、jedis的消息隊列方法簡述
1.1、釋出消息方法
(其中,channel是對應消息通道,message是對應消息體)
jedis.publish(channel, message);
1.2、監聽消息方法
(其中,jedisPubSub用于處理監聽到的消息,channels是對應的通道)
jedis.subscribe(jedisPubSub, channels);
2、釋出消息
/**
* 從jedis連接配接池擷取jedis操作執行個體
* @return
*/
public static Jedis getJedis() {
return RedisPoolManager.getJedis();
}
/**
* 推入消息到redis消息通道
*
* @param String
* channel
* @param String
* message
*/
public static void publish(String channel, String message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
/**
* 推入消息到redis消息通道
*
* @param byte[]
* channel
* @param byte[]
* message
*/
public void publish(byte[] channel, byte[] message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
3、監聽消息
3.1、監聽消息主體方法
/**
* 監聽消息通道
* @param jedisPubSub - 監聽任務
* @param channels - 要監聽的消息通道
*/
public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
/**
* 監聽消息通道
* @param jedisPubSub - 監聽任務
* @param channels - 要監聽的消息通道
*/
public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
3.2、處理監聽到的消息任務
class Tasker implements Runnable {
private String[] channel = null;//監聽的消息通道
private JedisPubSub jedisPubSub = null;//消息處理任務
public Tasker(JedisPubSub jedisPubSub, String ...channel) {
this.jedisPubSub = jedisPubSub;
this.channel = channel;
}
@Override
public void run() {
// 監聽channel通道的消息
RedisMQ.subscribe(jedisPubSub, channel);
}
}
3.3、處理監聽到的消息主體類實作
package cn.eguid.livePushServer.redisManager;
import java.util.Map;
import org.json.JSONObject;
import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;
public class RedisMQHandler extends JedisPubSub{
PushManager pushManager = null;
public RedisMQHandler(PushManager pushManager) {
super();
this.pushManager = pushManager;
}
@Override
// 接收到消息後進行分發執行
public void onMessage(String channel, String message) {
JSONObject jsonObj = new JSONObject(message);
System.out.println(channel+","+message);
if ("push".equals(channel)) {
Map<String,Object> map=jsonObj.toMap();
System.out.println("接收到一條推流消息,準備推流:"+map);
// String appName=pushManager.push(map);
//推流完成後還需要釋出一個成功消息到傳回隊列
} else if ("close".equals(channel)) {
String appName=jsonObj.getString("appName");
System.out.println("接收到一條關閉消息,準備關閉應用:"+appName);
// pushManager.closePush(appName);
}
}
}
4、測試消息隊列釋出和監聽
public static void main(String[] args) throws InterruptedException {
PushManager pushManager= new PushManagerImpl();
Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
t1.start();
t2.start();
LivePushEntity livePushInfo=new LivePushEntity();
livePushInfo.setAppName("test1");
JSONObject json=new JSONObject(livePushInfo);
publish("push",json.toString());
publish("close", json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
}