天天看點

Redis的Keyspace notifications功能初探

最近在做一套系統,其中要求若幹個Worker伺服器将心跳資訊都上報給中央伺服器。當一定時間中央伺服器沒有得到心跳資訊時則認為該Worker失效了,發出告警。

滿足這種需求的解決方法多種多樣,我開始想到了memcache,上報一次心跳資訊就重新整理一次緩存,當緩存内心跳資訊對象逾時被删除,即認為對應的Worker失效。然而由于memcache的工作原理,删除都是被動的,我們無法及時判斷資料是否過期,即便知道了資料過期,也沒有一種機制來回調方法來執行自定義的處理動作。難道緩存架構就真的不行了嗎?

答案是否定的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“鍵空間通知”)的功能。如何了解該功能呢?我們來看下官方是怎麼說的:

鍵空間通知,允許Redis用戶端從“釋出/訂閱”通道中建立訂閱關系,以便用戶端能夠在Redis中的資料因某種方式受到影響時收到相應事件。

可能接收到的事件舉例如下:

影響一個給出的鍵的所有指令(會告訴你哪個鍵被執行了一個指令,這個指令是什麼);

接收到了一個LPUSH操作的所有鍵(LPUSH指令:key v1 [v2 v3..]将指定的所有值從左到右進行壓棧操作,形成一個棧,并将該棧命名為指定的key);

在資料庫0中失效的所有鍵(不一定非得是資料庫0,這裡這樣表述其實想表達可以知道影響的哪個資料庫)。

看到這裡我聯想到,如果一條緩存資料失效了,通過訂閱關系,用戶端會收到消息,通過分析消息可以得知何種消息,分析消息内容可以知道是哪個key失效了。這樣就可以間接實作開頭所描述的功能。

接下來我們來看下實驗的步驟:

1.準備redis伺服器

作為開源軟體,redis下載下傳後得到的是源代碼,使用tar -xzvf redis-2.8.9.tar.gz解壓後對其進行編譯,過程也很簡單,make就可以了。編譯完成之後可以使用自帶的runtest進行測試,看是否編譯成功。然後就是安裝了,執行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX參數指定的就是安裝路徑。現在安裝的隻有可執行檔案,還沒有配置檔案。其實在源碼目錄中有一個模闆redis.conf,我們對它進行改動就可以了。

其他配置我們不關心,但是官方文檔中說Keyspace notifications功能預設是關閉的(預設地,Keyspace 時間通知功能是禁用的,因為它或多或少會使用一些CPU的資源),我們需要打開它。打開的方法也很簡單,配置屬性:notify-keyspace-events

預設配置是這樣的:notify-keyspace-events ""

根據文檔中的說明:

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.      

我們配置為:

notify-keyspace-events Ex,含義為:釋出key事件,使用過期事件(當每一個key失效時,都會生成該事件)。

2.準備用戶端和連接配接配置

本文中使用的用戶端是Jedis,版本為2.4.2,為了代碼的通用性,我使用Spring來管理連接配接:

<bean id="pool" class="redis.clients.jedis.JedisPool">
      <constructor-arg>
        <bean id="config" class="redis.clients.jedis.JedisPoolConfig">
          <property name="maxIdle" value="0" />
          <property name="maxTotal" value="20" />
          <property name="maxWaitMillis" value="1000" />
          <property name="testOnBorrow" value="true" />
      </bean>
      </constructor-arg>
      <constructor-arg>
        <value>192.168.1.100</value>
      </constructor-arg>
      <constructor-arg>
    <value>6379</value>
  </constructor-arg>
</bean>      

然後使用Spring Test和Junit來測試代碼

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext*.xml")
public class RedisSubscribeDemo {
    
    private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);
    
    @Resource
    private JedisPool pool;
    
    @Test
    public void doTest() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            TestThread thread= new TestThread(pool);
            thread.start();
        }
        Thread.sleep(50000L);
        Log.info("Test finish...");
    }
}      

由于要使用一定的延遲,我們把主要測試代碼放到了TestThread中。當測試線程啟動後,主線程停滞50秒,讓我們有足夠的時間來操作。

public class TestThread extends Thread {
  
  private Log log= LogFactory.getLog(TestThread.class);
  
  private JedisPool pool;
  
  public TestThread(JedisPool pool){
    log.info("loading test thread");
    this.pool= pool;
  }

  @Override
  public void run() {
    Jedis jedis= pool.getResource();
    jedis.psubscribe(new MySubscribe(), "*");
    try {
      Thread.sleep(10000L);
    } catch (InterruptedException e) {
      log.info("延時失敗", e);
    }
    jedis.close();
    log.info("Test run finished");
  }
}      

在測試線程中,我們将自定義的MySubscribe加入到了Jedis的模闆訂閱(即psubscribe,因為模闆訂閱的channel是支援星号'*'通配的,這樣可以收集到多個通配通道的消息,而與之相反的還有一個subscribe,此訂閱隻能指定嚴格比對的通道)中,同樣為了測試過程能夠将結果顯示出來,在綁定了訂閱後,對該線程進行了延時10秒。

public class MySubscribe extends JedisPubSub {
  
  private static final Log log= LogFactory.getLog(MySubscribe.class);

  // 初始化按表達式的方式訂閱時候的處理  
  public void onPSubscribe(String pattern, int subscribedChannels) {  
        log.info(pattern + "=" + subscribedChannels);
  }
  
  // 取得按表達式的方式訂閱的消息後的處理  
  public void onPMessage(String pattern, String channel, String message) {  
        log.info(pattern + "=" + channel + "=" + message);
  }

  ...其他未用到的重寫方法忽略
}      

作為Jedis自定義訂閱,必須繼承redis.clients.jedis.JedisPubSub類,在

psubscribe模式下,重點重寫onPMessage方法,該方法為接收到模闆訂閱後處理事件的重要代碼。pattern為在綁定訂閱時使用的通配模闆,channel為通配後符合條件的實際通道名稱,message就不用多說了,就是事件消息内容。

3.實戰

通過Redis自帶的redis-cli指令,我們可以在服務端通過指令行的方式直接操作。我們運作上面的示例代碼,然後迅速切換到redis-cli指令中,建立一個生命周期很短暫的資料:

127.0.0.1:6379> set chaijunkun 123 PX 100      

PX參數指定生命周期機關為毫秒,100即聲明周期,即100毫秒。key為chaijunkun的資料,其值為123。

當執行語句後,回顯:

OK      

這時我們看執行個體程式的輸出:

*=__keyevent@0__:expired=chaijunkun      

從輸出可以看出,之前指定的通配符為*,通配任何通道;之後是實際的通道名稱:__keyevent@0__:expired,這裡我們可以看到訂閱收到了一個keyevent位于資料庫0,事件類型為:expired,是一個過期事件;最後是chaijunkun,這個是過期資料的key。

在官方文檔中,keyevent通道的格式永遠是這樣的:

__keyevent@<db>__:prefix

對于資料過期事件,我們在綁定訂閱時通配模闆也可以精确地寫成:

__keyevent@*__:expired

通過示例代碼,我們可以看到确實印證了之前的構想,實作了資料過期的事件觸發(event)或者說回調(callback)。

4.其他應用

之前的代碼中,對于事件的釋出都是由Redis自己生成的。實際上在指令中主動釋出自定義消息也是可以的,在publish指令的幫助中我們看到:

127.0.0.1:6379> help publish

  PUBLISH channel message
  summary: Post a message to a channel
  since: 2.0.0
  group: pubsub      

通過參數,可以自定義通道名稱和通道消息。而在Jedis中,釋出API甚至做到了位元組資料的級别:

繼續閱讀