引言
了解Jedis的童鞋可能清楚,Jedis中
JedisCluster
是不支援pipeline操作的,如果使用了redis叢集,在
spring-boot-starter-data-redis
中又正好用到的pipeline,那麼會接收到
Pipeline is currently not supported for JedisClusterConnection.
這樣的報錯。錯誤來自于
org.springframework.data.redis.connection.jedis.JedisClusterConnection
:
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisConnection#openPipeline()
*/
@Override
public void openPipeline() {
throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection.");
}
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
會幫我們自動配置,無論你redis使用的是standalone、sentinel、cluster配置。這個源碼很容易了解,讀者可自行閱讀,不了解的可以一起讨論。
Lettuce中的pipeline
spring boot 2.0開始,配置
spring-boot-starter-data-redis
将不依賴Jedis,而是依賴Lettuce,在Lettuce中,redis cluster使用pipeline不會有問題。
知識儲備
再往下看可能需要讀者具備如下的能力:
- redis cluster hash slot
- JedisCluster & Jedis的關系
- pipeline和*mset等指令的差別
哈希槽(hash slot)
redis cluster一共有16384個桶(hash slot),用來裝資料,建立叢集的時候每個叢集節點會負責一些slot的資料存儲,比如我負責0-1000,你負責1001-2000,他負責2001-3000……
資料存儲時,每個key在存入redis cluster前,會利用CRC16計算出一個值,這個值就是對應redis cluster的hash slot,就知道這個key會被放到哪個伺服器上了。
參考文檔:
Redis 叢集教程 Redis 叢集規範JedisCluster本質上是使用Jedis來和redis叢集進行打交道的,具體過程是:
- 擷取該key的slot值:
JedisClusterCRC16.getSlot(key)
- 從
執行個體中擷取到該slot對應的JedisClusterConnectionHandler
執行個體:Jedis
Jedis connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
- 利用connection操作。
redis提供了mset,hmset之類的指令,或者說集合操作可以使用sadd key 1 2 3 4 5 6 ..... 10000000000這種一口氣傳一堆資料的指令。
有時候你甚至會發現*mset這種一口氣操作一堆資料的速度更快。那麼這種使用場景會有什麼弊端呢?答案是:阻塞。
操作這一堆資料需要多久,就會阻塞多久。
Redis Cluster下pipeline使用的思考
由于JedisCluster中的所有操作本質上是使用Jedis,而Jedis是支援pipeline操作的,所有,要在redis cluster中使用pipeline是有可能的,隻要你操作同一個鍵即可,準确的說,應該是你操作的鍵位于同一台伺服器,更直白的,你操作的鍵是同一個Jedis執行個體。ok,如果你已經暈了,那你需要回看一下“知識儲備”。
說說筆者的使用場景吧,我們是把csv檔案的一批資料讀到記憶體中,同一批資料是存儲到同一個key中的,最後的操作會類似于:
set key member1
set key member2
set key member3
...
set key member100000
操作的是同一個key,可以利用JedisCluster擷取到該key的Jedis執行個體,然後利用pipeline操作。
讓spring-data-redis也支援pipeline的思路
提供一下代碼思路。
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection redisConnection = factory.getConnection();
JedisClusterConnection jedisClusterConnection = (JedisClusterConnection) redisConnection;
// 擷取到原始到JedisCluster連接配接
JedisCluster jedisCluster = jedisClusterConnection.getNativeConnection();
// 通過key擷取到具體的Jedis執行個體
// 計算hash slot,根據特定的slot可以擷取到特定的Jedis執行個體
int slot = JedisClusterCRC16.getSlot(key);
/**
* 不建議這麼使用,官方在2.10版本已經修複<a href="https://github.com/xetorthio/jedis/pull/1532">此問題</a><br>
* 2.10版本中,官方會直接提供JedisCluster#getConnectionFromSlot
*/
Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);
field.setAccessible(true);
JedisSlotBasedConnectionHandler jedisClusterConnectionHandler = (JedisSlotBasedConnectionHandler) field.get(jedisCluster);
Jedis jedis = jedisClusterConnectionHandler.getConnectionFromSlot(slot);
// 接下來就是pipeline操作了
Pipeline pipeline = jedis.pipelined();
...
pipeline.syncAndReturnAll();
以上代碼完全可以模仿spring-data-redis中
RedisTemplate#executePipelined
方法寫成一個通用的方法,供使用者調用。