天天看點

Redis: pipeline加速Redis請求,實作批處理

 Redis是基于TCP連接配接進行通信

Redis是使用用戶端 - 伺服器模型的TCP伺服器,稱為請求/響應協定。

這意味着通常一個請求是通過以下步驟完成的:

  1. 用戶端向伺服器發送查詢,并通常以阻塞的方式從套接字讀取伺服器響應。

  2. 伺服器處理指令并将響應發送回用戶端。

知道redis是基于TCP連接配接進行通信的,每一個request/response都需要經曆一個RTT(Round-Trip Time 往返時間),如果需要執行很多短小的指令,這些往返時間的開銷是很大的,在此情形下,redis提出了管道來提高執行效率。

pipeline的思想

  1. 如果client執行一些互相之間無關的指令或者不需要擷取指令的傳回值,那麼redis允許你連續發送多條指令,而不需要等待前面指令執行完畢。

  2. 比如我們執行3條INCR指令,如果使用管道,理論上隻需要一個RTT+3條指令的執行時間即可,如果不适用管道,那麼可能需要額外的兩個RTT時間。

  3. 是以,管道相當于批處理腳本,相當于是指令集。

pipeline不是打包的指令越多越好

  1. 通過pipeline方式當有大批量的操作時候。我們可以節省很多原來浪費在網絡延遲的時間。

  2. 需要注意到是用 pipeline方式打包指令發送,redis必須在處理完所有指令前先緩存起所有指令的處理結果。

  3. 打包的指令越多,緩存消耗記憶體也越多。是以并不是打包的指令越多越好。具體多少合适需要根據具體情況測試。

pipeline常用API

Redis: pipeline加速Redis請求,實作批處理
Redis: pipeline加速Redis請求,實作批處理
package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisDataException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
  private MultiResponseBuilder currentMulti;
  private class MultiResponseBuilder extends Builder<List<Object>> {
    private List<Response<?>> responses = new ArrayList<Response<?>>();
    @Override
    public List<Object> build(Object data) {
      @SuppressWarnings("unchecked")
      List<Object> list = (List<Object>) data;
      List<Object> values = new ArrayList<Object>();
      if (list.size() != responses.size()) {
        throw new JedisDataException("Expected data size " + responses.size() + " but was "
            + list.size());
      }
      for (int i = 0; i < list.size(); i++) {
        Response<?> response = responses.get(i);
        response.set(list.get(i));
        Object builtResponse;
        try {
          builtResponse = response.get();
        } catch (JedisDataException e) {
          builtResponse = e;
        }
        values.add(builtResponse);
      }
      return values;
    }
    public void setResponseDependency(Response<?> dependency) {
      for (Response<?> response : responses) {
        response.setDependency(dependency);
      }
    }
    public void addResponse(Response<?> response) {
      responses.add(response);
    }
  }
  @Override
  protected <T> Response<T> getResponse(Builder<T> builder) {
    if (currentMulti != null) {
      super.getResponse(BuilderFactory.STRING); // Expected QUEUED
      Response<T> lr = new Response<T>(builder);
      currentMulti.addResponse(lr);
      return lr;
    } else {
      return super.getResponse(builder);
    }
  }
  public void setClient(Client client) {
    this.client = client;
  }
  @Override
  protected Client getClient(byte[] key) {
    return client;
  }
  @Override
  protected Client getClient(String key) {
    return client;
  }
  public void clear() {
    if (isInMulti()) {
      discard();
    }
    sync();
  }
  public boolean isInMulti() {
    return currentMulti != null;
  }
  /**
   * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
   * get return values from pipelined commands, capture the different Response<?> of the
   * commands you execute.
   */
  public void sync() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }
  /**
   * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
   * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
   * responses and generate the right response type (usually it is a waste of time).
   * @return A list of all the responses in the order you executed them.
   */
  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      List<Object> formatted = new ArrayList<Object>();
      for (Object o : unformatted) {
        try {
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
  public Response<String> discard() {
    if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI");
    client.discard();
    currentMulti = null;
    return getResponse(BuilderFactory.STRING);
  }
  public Response<List<Object>> exec() {
    if (currentMulti == null) throw new JedisDataException("EXEC without MULTI");
    client.exec();
    Response<List<Object>> response = super.getResponse(currentMulti);
    currentMulti.setResponseDependency(response);
    currentMulti = null;
    return response;
  }
  public Response<String> multi() {
    if (currentMulti != null) throw new JedisDataException("MULTI calls can not be nested");
    client.multi();
    Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
    // OK
    currentMulti = new MultiResponseBuilder();
    return response;
  }
  @Override
  public void close() throws IOException {
    clear();
  }
}      
Redis: pipeline加速Redis請求,實作批處理
  • Pipeline在某些場景下非常有用,比如有多個command需要被“及時的”送出,而且他們對相應結果沒有互相依賴,而且對結果響應也無需立即獲得,那麼pipeline就可以充當這種“批處理”的工具;而且在一定程度上,可以較大的提升性能,性能提升的原因主要是TCP連結中較少了“互動往返”的時間。

通過Jedis操作pipeline

@Test
    public void testPipeline() {
        Jedis jedis = null;
        Pipeline pipeline = null;
        try {
            // 建立一個jedis的對象。
            jedis = new Jedis("ip", 6379);
            jedis.auth("密碼");
            // 擷取一個管道對象
            pipeline = jedis.pipelined();
            // 删除已經存在的key
            pipeline.del("pipelinedList");
            // 循環添加
            for (int i = 0; i < 100; i++) {
                pipeline.rpush("pipelinedList",i+"");
            }
            // 執行
            pipeline.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 關閉pipeline
            if(pipeline != null){
                try {
                    pipeline.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 關閉jedis。
            if(jedis != null){
                jedis.close();
            }
        }
    }      
Redis: pipeline加速Redis請求,實作批處理

pipelineVS事務

  • 管道和事務是不同的,pipeline隻是表達“互動”中操作的傳遞的方向性,pipeline也可以在事務中運作,也可以不在。
  • 無論如何,pipeline中發送的每個command都會被server立即執行,如果執行失敗,将會在此後的相應中得到資訊;也就是pipeline并不是表達“所有command都一起成功”的語義,管道中前面指令失敗,後面指令不會有影響,繼續執行。
  • 簡單來說就是管道中的指令是沒有關系的,它們隻是像管道一樣流水發給server,而不是串行執行,僅此而已;但是如果pipeline的操作被封裝在事務中,那麼将有事務來確定操作的成功與失敗。
  • pipeline 隻是把多個redis指令一起發出去,redis并沒有保證這些指定的執行是原子的;multi相當于一個redis的transaction的,保證整個操作的原子性,避免由于中途出錯而導緻最後産生的資料不一緻

pipelineVS腳本

  • 使用管道可能在效率上比使用script要好,但是有的情況下隻能使用script。因為在執行後面的指令時,無法得到前面指令的結果,就像事務一樣,是以如果需要在後面指令中使用前面指令的value等結果,則隻能使用script或者事務+watch。
  • 使用 Redis腳本 (在Redis版本2.6或更高版本中可用),可以使用執行伺服器端所需的大量工作的腳本更高效地處理一些 pipelining 用例。
  • 腳本的一大優勢是它能夠以最小的延遲讀取和寫入資料,使得讀取,計算,寫入等操作非常快速(在這種情況下,流水線操作無法提供幫助,因為用戶端先需要讀指令的回應,它才可以調用寫指令)。
  • 有時,應用程式可能還想在 pipeline 中發送 EVAL EVALSHA 指令。這是完全可能的,Redis通過 SCRIPT LOAD 指令明确地支援它(它保證可以調用 而沒有失敗的風險)。

來源:

Using pipelining to speedup Redis queries – Redis redis中的事務、lua腳本和管道的使用場景_fangjian1204的專欄-CSDN部落格_if redis.call