天天看点

OkHttp源码解析之请求数据与回调1 使用2 同步与异步请求3 拦截器原理

1 使用

首先从一个简单使用例子开始入手,下面的是源码中的一个例子:

OkHttpClient client = new OkHttpClient();//步骤1

    // Create request for remote resource.
    Request request = new Request.Builder()
        .url(ENDPOINT)
        .build();//步骤2

    // Execute the request and retrieve the response.
    try (Response response = client.newCall(request).execute()) {//步骤3
      // Deserialize HTTP response to concrete type.
      ResponseBody body = response.body();
      List<Contributor> contributors = CONTRIBUTORS_JSON_ADAPTER.fromJson(body.source());

      // Sort list by the most contributions.
      Collections.sort(contributors, (c1, c2) -> c2.contributions - c1.contributions);

      // Output list of contributors.
      for (Contributor contributor : contributors) {
        System.out.println(contributor.login + ": " + contributor.contributions);
      }
    }
           

步骤1: 创建一个OkHttpClient对象。

步骤2:通过建造者模式创建一个请求,这边可以配置Url等数据。

步骤3:加载请求并执行等待响应。

至此一个基本的请求过程编写完成。

2 同步与异步请求

Call对象由client的newCall生成,其中有两种方式。第一种是execute(),它会阻塞当前线程。第二种是enqueue,它并不会阻塞线程,而是将任务交给线程池去处理,处理完再将结果 返回回来。那么接下来看看client.newCall(request)是怎么一回事:

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
           

可见它是通过OKHttpClient,Request等创建了一个新的RealCall对象。那么这个实际执行请求回调操作的也就是RealCall对象。先看看它的同步方式:

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
      client.dispatcher().executed(this);
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
           

流程如下:execute() --> client.dispatcher().executed(this) --> runningSyncCalls.add(call) --> getResponseWithInterceptorChain() --> client.dispatcher().finished(this)

首先是将该call加入runningSyncCalls,这表示该任务已经子啊执行了。然后调用getResponseWithInterceptorChain方法也就是开始请求了。这都是发生在当前线程所以会阻塞线程。最后会调用dispatcher的finish方法,该方法会去通知线程池开始去运行等待任务,这个finish方法方法后面会分析。

@Override public void enqueue (Callback responseCallback) {
    。。。
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInt2 同步与异步请求eger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }
           

流程如下:client.dispatcher().enqueue --> readyAsyncCalls.add(call) --> promoteAndExecute()

解释一下,就是先把call加入到readyAsyncCalls中去,然后promoteAndExecute()通知线程池去取出一个call来处理。

那么就有必要看下promoteAndExecute()方法是如何实现的:

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {//步骤1
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {//步骤2
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
           

步骤1:遍历readyAsyncCalls,有两种特殊情况会停止或者继续循环。

  • 当runningAsyncCalls超过maxRequests的话说明线程要处理的任务已经满了直接跳出循环。
  • 当asyncCall.callsPerHost().get()也就是当前call所对应的host已经接收了足够过的call请求,那么循环继续。

如果这两个都不满足执行下面的步骤,主要是往executableCalls中添加要执行的请求以及往runningAsyncCalls里添加要执行的请求。

步骤2:遍历刚才的得到的executableCalls,并用调用线程池执行任务。具体是调用AsyncCall的executeOn(executorService())方法实现的。

现在看看异步回调AsyncCall的源码:

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

    。。。

    /**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        。。。
      } catch (Throwable t) {
        。。。
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
  
  public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

           

AsyncCall继承自NamedRunnable。所以调用流程如下所示:

executeOn --> executorService.execute(this) --> run() --> execute()

此时就开始在线程池中取一个线程执行,先调用getResponseWithInterceptorChain()执行请求,再调用responseCallback.onResponse(RealCall.this, response)将的到的响应数据回调回去。

和同步execute一样最后都会执行finished(this),那么看下finished里面是怎么操作的:

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {//步骤1
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {//步骤2
    finished(runningSyncCalls, call);
  }

  private <T> void finished(Deque<T> calls, T call) {//步骤3
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }
           

不管是步骤1还是步骤2,最后都会跳到步骤3.而步骤3最主要的一步就是 promoteAndExecute()。也就是说每个任务完成之后都会执行promoteAndExecute(),以确保等待列表中的任务会被不断的执行。

3 拦截器原理

从上面的分析可知,getResponseWithInterceptorChain()方法用于获取网络数据,那么看看源码:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());//步骤1

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);//步骤2
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }
           

用户定义的拦截器再加上系统自带的6个拦截器一共有全部塞如interceptors中去。

步骤1 直观的比喻就是将这些拦截器用链条连接起来,这个是链条的第一个节点他的 index = 0。

步骤2 开始往链条上传递请求参数。层层传递过去获取到数据之后层层传递回来:

那么就看看proceed是怎么处理的:

@Override public Response proceed(Request request) throws IOException {
    return proceed(request, transmitter, exchange);
  }

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    。。。

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);//步骤1
    Interceptor interceptor = interceptors.get(index);//步骤2
    Response response = interceptor.intercept(next);//步骤3

   。。。

    return response;
  }
           

步骤1:封装index,RealInterceptorChain对象next的indext在每次递归调用之后都会加1

步骤2:这边的index是指当前Interceptor对应的下一个Interceptor。

步骤3:将包含当前Interceptor对应的下下个Interceptor的index以及请求等参数封装成next对象,让下一个interceptor处理next中的请求并继续递归。

比如说RetryAndFollowUpInterceptor。如果没有用户自定义的拦截器,那么这将是第一个拦截器,看看它的intercept方法:

@Override public Response intercept(Chain chain) throws IOException {
     。。。//步骤1
         RealInterceptorChain realChain = (RealInterceptorChain) chain;
        response = realChain.proceed(request, transmitter, null);//步骤2
      。。。//步骤3
  }
           

步骤1:对请求的数据进行处理

步骤2:将请求抛给下个拦截器。此时realChain的index是1。它调用RealInterceptorChain的proceed方法,此时去看下是怎么变化的:

@Override public Response proceed(Request request) throws IOException {
    return proceed(request, transmitter, exchange);
  }

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    。。。

    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);//此时的next对应的index是2,他对应的是CacheInterceptor拦截器
    Interceptor interceptor = interceptors.get(index );//此时的值是1,他对应的是BridgeInterceptor拦截器
    Response response = interceptor.intercept(next);

   。。。

    return response;
  }
           

可以看到此时interceptor.intercept(next)方法的功能是执行BridgeInterceptor拦截器,并且下一个递归到的是CacheInterceptor拦截

步骤3:对响应的数据进行处理

这种递归到最后肯定是直接返回的,看看最后一个拦截器是怎么操作的:

@Override public Response intercept(Chain chain) throws IOException {
    。。。

    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

   。。。

    return response;
  }
           

可见proceed没有被调用它直接处理了请求并将响应反馈回来(并没有执行proceed操作也就是说没有递归了),那么接下来响应也会层层的递归回来。最终反馈到getResponseWithInterceptorChain()得到结果。

整体流程如下图所示:

OkHttp源码解析之请求数据与回调1 使用2 同步与异步请求3 拦截器原理

先是将请求数据层层递归下去,最后在RealInterceptorChain做最终的处理,得到结果后又层层递归回来。