Redis是基于TCP連接配接進行通信
Redis是使用用戶端 - 伺服器模型的TCP伺服器,稱為請求/響應協定。
這意味着通常一個請求是通過以下步驟完成的:
用戶端向伺服器發送查詢,并通常以阻塞的方式從套接字讀取伺服器響應。
伺服器處理指令并将響應發送回用戶端。
知道redis是基于TCP連接配接進行通信的,每一個request/response都需要經曆一個RTT(Round-Trip Time 往返時間),如果需要執行很多短小的指令,這些往返時間的開銷是很大的,在此情形下,redis提出了管道來提高執行效率。
pipeline的思想
如果client執行一些互相之間無關的指令或者不需要擷取指令的傳回值,那麼redis允許你連續發送多條指令,而不需要等待前面指令執行完畢。
比如我們執行3條INCR指令,如果使用管道,理論上隻需要一個RTT+3條指令的執行時間即可,如果不适用管道,那麼可能需要額外的兩個RTT時間。
是以,管道相當于批處理腳本,相當于是指令集。
pipeline不是打包的指令越多越好
通過pipeline方式當有大批量的操作時候。我們可以節省很多原來浪費在網絡延遲的時間。
需要注意到是用 pipeline方式打包指令發送,redis必須在處理完所有指令前先緩存起所有指令的處理結果。
打包的指令越多,緩存消耗記憶體也越多。是以并不是打包的指令越多越好。具體多少合适需要根據具體情況測試。
pipeline常用API

package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisDataException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
private MultiResponseBuilder currentMulti;
private class MultiResponseBuilder extends Builder<List<Object>> {
private List<Response<?>> responses = new ArrayList<Response<?>>();
@Override
public List<Object> build(Object data) {
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) data;
List<Object> values = new ArrayList<Object>();
if (list.size() != responses.size()) {
throw new JedisDataException("Expected data size " + responses.size() + " but was "
+ list.size());
}
for (int i = 0; i < list.size(); i++) {
Response<?> response = responses.get(i);
response.set(list.get(i));
Object builtResponse;
try {
builtResponse = response.get();
} catch (JedisDataException e) {
builtResponse = e;
}
values.add(builtResponse);
}
return values;
}
public void setResponseDependency(Response<?> dependency) {
for (Response<?> response : responses) {
response.setDependency(dependency);
}
}
public void addResponse(Response<?> response) {
responses.add(response);
}
}
@Override
protected <T> Response<T> getResponse(Builder<T> builder) {
if (currentMulti != null) {
super.getResponse(BuilderFactory.STRING); // Expected QUEUED
Response<T> lr = new Response<T>(builder);
currentMulti.addResponse(lr);
return lr;
} else {
return super.getResponse(builder);
}
}
public void setClient(Client client) {
this.client = client;
}
@Override
protected Client getClient(byte[] key) {
return client;
}
@Override
protected Client getClient(String key) {
return client;
}
public void clear() {
if (isInMulti()) {
discard();
}
sync();
}
public boolean isInMulti() {
return currentMulti != null;
}
/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
* get return values from pipelined commands, capture the different Response<?> of the
* commands you execute.
*/
public void sync() {
if (getPipelinedResponseLength() > 0) {
List<Object> unformatted = client.getAll();
for (Object o : unformatted) {
generateResponse(o);
}
}
}
/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
* possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
* responses and generate the right response type (usually it is a waste of time).
* @return A list of all the responses in the order you executed them.
*/
public List<Object> syncAndReturnAll() {
if (getPipelinedResponseLength() > 0) {
List<Object> unformatted = client.getAll();
List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} else {
return java.util.Collections.<Object> emptyList();
}
}
public Response<String> discard() {
if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI");
client.discard();
currentMulti = null;
return getResponse(BuilderFactory.STRING);
}
public Response<List<Object>> exec() {
if (currentMulti == null) throw new JedisDataException("EXEC without MULTI");
client.exec();
Response<List<Object>> response = super.getResponse(currentMulti);
currentMulti.setResponseDependency(response);
currentMulti = null;
return response;
}
public Response<String> multi() {
if (currentMulti != null) throw new JedisDataException("MULTI calls can not be nested");
client.multi();
Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
// OK
currentMulti = new MultiResponseBuilder();
return response;
}
@Override
public void close() throws IOException {
clear();
}
}
- Pipeline在某些場景下非常有用,比如有多個command需要被“及時的”送出,而且他們對相應結果沒有互相依賴,而且對結果響應也無需立即獲得,那麼pipeline就可以充當這種“批處理”的工具;而且在一定程度上,可以較大的提升性能,性能提升的原因主要是TCP連結中較少了“互動往返”的時間。
通過Jedis操作pipeline
- pipeline中不同資料結構的指令和不使用pipeline是一樣,具體參考: Redis不同資料類型指令使用及應用場景_琦彥-CSDN部落格_redis不同資料類型的應用場景
@Test
public void testPipeline() {
Jedis jedis = null;
Pipeline pipeline = null;
try {
// 建立一個jedis的對象。
jedis = new Jedis("ip", 6379);
jedis.auth("密碼");
// 擷取一個管道對象
pipeline = jedis.pipelined();
// 删除已經存在的key
pipeline.del("pipelinedList");
// 循環添加
for (int i = 0; i < 100; i++) {
pipeline.rpush("pipelinedList",i+"");
}
// 執行
pipeline.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉pipeline
if(pipeline != null){
try {
pipeline.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 關閉jedis。
if(jedis != null){
jedis.close();
}
}
}
pipelineVS事務
- 管道和事務是不同的,pipeline隻是表達“互動”中操作的傳遞的方向性,pipeline也可以在事務中運作,也可以不在。
- 無論如何,pipeline中發送的每個command都會被server立即執行,如果執行失敗,将會在此後的相應中得到資訊;也就是pipeline并不是表達“所有command都一起成功”的語義,管道中前面指令失敗,後面指令不會有影響,繼續執行。
- 簡單來說就是管道中的指令是沒有關系的,它們隻是像管道一樣流水發給server,而不是串行執行,僅此而已;但是如果pipeline的操作被封裝在事務中,那麼将有事務來確定操作的成功與失敗。
- pipeline 隻是把多個redis指令一起發出去,redis并沒有保證這些指定的執行是原子的;multi相當于一個redis的transaction的,保證整個操作的原子性,避免由于中途出錯而導緻最後産生的資料不一緻
pipelineVS腳本
- 使用管道可能在效率上比使用script要好,但是有的情況下隻能使用script。因為在執行後面的指令時,無法得到前面指令的結果,就像事務一樣,是以如果需要在後面指令中使用前面指令的value等結果,則隻能使用script或者事務+watch。
- 使用 Redis腳本 (在Redis版本2.6或更高版本中可用),可以使用執行伺服器端所需的大量工作的腳本更高效地處理一些 pipelining 用例。
- 腳本的一大優勢是它能夠以最小的延遲讀取和寫入資料,使得讀取,計算,寫入等操作非常快速(在這種情況下,流水線操作無法提供幫助,因為用戶端先需要讀指令的回應,它才可以調用寫指令)。
- 有時,應用程式可能還想在 pipeline 中發送 EVAL 或 EVALSHA 指令。這是完全可能的,Redis通過 SCRIPT LOAD 指令明确地支援它(它保證可以調用 而沒有失敗的風險)。
來源:
Using pipelining to speedup Redis queries – Redis redis中的事務、lua腳本和管道的使用場景_fangjian1204的專欄-CSDN部落格_if redis.call