在Redis早期版本就已經提供publish/subscribe 模式,該文使用Jedis用戶端的一個小例子.
Jedis 類中提供:
在Jedis中提供 釋出二進制編碼 ,string字元串 以及pattern比對模式三種方式來釋出publish消息.
public Long publish(final String channel, final String message);
public Long publish(byte[] channel, byte[] message);
public List<String> pubsubChannels(String pattern) ;
同時提供 二進制編碼和string字元串來訂閱消息.
public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels)
public void subscribe(final JedisPubSub jedisPubSub, final String... channels)
在訂閱消息中涉及到2個重要類.BinaryJedisPubSub 和JedisPubSub 類,這2個類用來處理收到消息時,對消息的邏輯處理.
public abstract class JedisPubSub {
}
public abstract class BinaryJedisPubSub {
}
這兩個類為抽象類必須通過使用者來實作該類. 這兩個類中分别有重要的方法onMessage 當收到消息時需要處理.
public void onMessage(byte[] channel, byte[] message) {
}
public void onMessage(String channel, String message) {
}
PublishMsg.java 釋出消息端:
Jedis jedis = new Jedis("localhost");
//釋出Protocol Buffer 協定消息
Builder builder = UserBean.newBuilder();
builder.setId(1000);
UserBean userbean = builder.build();
ByteArrayOutputStream output =new ByteArrayOutputStream();
userbean.writeTo(output);
long loop=0;
while (loop++<10000) {
//釋出userbean 二進制消息
jedis.publish("userbean".getBytes(), output.toByteArray());
Thread.sleep(1000);
}
jedis.disconnect();
SubscribeMsg.java 訂閱消息端:
Jedis jedis = new Jedis("localhost");
//業務邏輯處理
UserBeanListener l =new UserBeanListener();
//訂閱userbean二進制消息
jedis.subscribe(l, "userbean".getBytes());
long loop=0;
while (loop++<10000)
{
Thread.sleep(1000);
}
jedis.disconnect();
UserBeanListener.java 業務消息處理:
public class UserBeanListener extends BinaryJedisPubSub {
@Override
public void onMessage(byte[] channel, byte[] message) {
try {
UserBean u = UserBean.parseFrom(message);
System.out.println(u.getId());
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
UserMsg.proto ProtocolBuffer協定檔案:
message UserBean{
// ID(必需)
required int32 id = 1;
}