天天看點

Redis: Jedis中publish/subscribe 使用

在Redis早期版本就已經提供publish/subscribe 模式,該文使用Jedis用戶端的一個小例子.

Jedis 類中提供:

在Jedis中提供 釋出二進制編碼 ,string字元串 以及pattern比對模式三種方式來釋出publish消息.

public Long publish(final String channel, final String message);

public Long publish(byte[] channel, byte[] message);

public List<String> pubsubChannels(String pattern) ;

同時提供 二進制編碼和string字元串來訂閱消息.

public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels)

public void subscribe(final JedisPubSub jedisPubSub, final String... channels)

在訂閱消息中涉及到2個重要類.BinaryJedisPubSub 和JedisPubSub 類,這2個類用來處理收到消息時,對消息的邏輯處理.

public abstract class JedisPubSub {

}

public abstract class BinaryJedisPubSub {

}

這兩個類為抽象類必須通過使用者來實作該類. 這兩個類中分别有重要的方法onMessage 當收到消息時需要處理.

public void onMessage(byte[] channel, byte[] message) {

  }

  public void onMessage(String channel, String message) {

  }

PublishMsg.java  釋出消息端:

Jedis jedis = new Jedis("localhost");
			    //釋出Protocol Buffer 協定消息
			    
			    Builder builder = UserBean.newBuilder();
			    builder.setId(1000);
			    UserBean userbean = builder.build();
			    ByteArrayOutputStream output =new ByteArrayOutputStream();
			    userbean.writeTo(output);
			    long loop=0;
				while (loop++<10000) {
					//釋出userbean 二進制消息
					jedis.publish("userbean".getBytes(), output.toByteArray());

					Thread.sleep(1000);
				} 
			    jedis.disconnect();	
           

SubscribeMsg.java 訂閱消息端:

Jedis jedis = new Jedis("localhost");
	    
        //業務邏輯處理
	    UserBeanListener l =new UserBeanListener();
	    //訂閱userbean二進制消息
	    jedis.subscribe(l, "userbean".getBytes());

	    long loop=0;
		while (loop++<10000)
		{
			Thread.sleep(1000);
		} 
	    jedis.disconnect();	
           

UserBeanListener.java 業務消息處理:

public class UserBeanListener extends BinaryJedisPubSub {

	@Override
	public void onMessage(byte[] channel, byte[] message) {
		
		try {
			UserBean u = UserBean.parseFrom(message);
			System.out.println(u.getId());
			
		} catch (InvalidProtocolBufferException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
}
           

UserMsg.proto  ProtocolBuffer協定檔案:

message UserBean{
	
	// ID(必需)
	required int32 id = 1;

}
           

繼續閱讀