天天看点

Okhttp3源码浅析

Okhttp3出来很久了,自己也用了一段时间了,之前一直都是生搬硬套的造轮子,今天根据网上的学习资料加上自己看了源码,把自己理解的部分记录下来。

OKhttp3特点

1.支持HTTP2/SADY 2.Socket选择最好的路线,并支持自动重连 3.拥有自动维护的Socket线程池,减少握手次数。 4.拥有队列线程池,轻松写并发 5.使用Interceptor处理请求响应(比如透明GZIP压缩) 6.基于Headers缓存策略

从网络请求入手

OkhttpClient.build   初始化

public OkHttpClient build() {
     	 return new OkHttpClient(this);
    	}
           

OkhttpClient.newCall  其实是OkhttpClient实现了Call.Factory接口,

      Call.Factory接口里面的newCall 方法实现

interface Factory {
  	  Call newCall(Request request);
 	 }
           

         通过newCall又创建了一个新的Call  , 而这个Call 是通过new RealCall来赋值的

@Override public Call newCall(Request request) {
   		 return new RealCall(this, request);
  	}
           

RealCall 持有封装好的OkhttpClient,Requst,ps这里RetryAndFollowUpInterceptor先不做分析

protected RealCall(OkHttpClient client, Request originalRequest) {
   	 this.client = client;
   	 this.originalRequest = originalRequest;
         this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client);
 	}
           

同步请求

OkhttpClient.newCall().excute()  这个 newCall().excute() = new RealCall().excute  因为newCall 是通过 new RealCall 来赋值的

@Override public Response execute() throws IOException {
   		 synchronized (this) {
   	 	     if (executed) throw new IllegalStateException("Already Executed");
   	 	 		 executed = true;
   		     }
   	             try {
   	  		 client.dispatcher().executed(this);
   	  		 Response result = getResponseWithInterceptorChain();
   	  		 if (result == null) throw new IOException("Canceled");
   	  		 return result;
   		    } finally {
   		   	client.dispatcher().finished(this);
   		    }
	       }
           

这段代码说明几个问题 1.判断这个RealCall是否已经执行,Call只能被执行一次 否则就报IllegalStateExcption

if (executed) throw new IllegalStateException("Already Executed");
   	 	 		 executed = true;
   		     }
           

2.把创建好的RealClaa交给线程池来执行

client.dispatcher().executed(this);
           

3.通过getResponseWithInterceptorChain()获得返回结果Response

Response result = getResponseWithInterceptorChain();
           

4.最后还要通知dispatch执行完毕

client.dispatcher().finished(this);
           

dispatcher 线程池

private int maxRequests = 64;     //最大并发请求数次
 	 private int maxRequestsPerHost = 5;	//主机最大请求数次
 	 private Runnable idleCallback;

  	/** Executes calls. Created lazily. */
 	 private ExecutorService executorService;    //线程池

 	 /** Ready async calls in the order they'll be run. */
  	private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();  //准备执行的请求

 	 /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
	  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 正在执行的异步请求,包含已经取消但未执行完的请求 

	  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
 	 private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();  //正在执行的同步请求,包含已经取消单未执行完的请求 
           

ExecutorService初始化配置

public synchronized ExecutorService executorService() {
    		if (executorService == null) {
    		  executorService = new ThreadPoolExecutor(0,//并发线程数如果为0,空闲一段时间销毁所有线程
 		  Integer.MAX_VALUE, //并发线程数最大值
		  60, //当线程数大于并发线程的时候
	          TimeUnit.SECONDS,//多余空闲线程的最大存活时间
        	  new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   		 }	
   		 return executorService;
 	 }
           

然后在继续分析ResponseWithInterceptorChain()

private Response getResponseWithInterceptorChain() throws IOException {
	    // Build a full stack of interceptors.
	    List<Interceptor> interceptors = new ArrayList<>();
	    interceptors.addAll(client.interceptors());//重置所有拦截器
	    interceptors.add(retryAndFollowUpInterceptor);//负责连接失败之后重新定向重连
 	    interceptors.add(new BridgeInterceptor(client.cookieJar()));//负责把用户构造的请求转换成发送的到服务器的请求,把服务器返回的响应换成用户的友好的响应
 	    interceptors.add(new CacheInterceptor(client.internalCache()));//负责读取缓存,更新缓存
	    interceptors.add(new ConnectInterceptor(client));//连接服务器
 	   if (!retryAndFollowUpInterceptor.isForWebSocket()) {
	      interceptors.addAll(client.networkInterceptors());//配置OkhttpClient,设置newworkInterceptors
	    }
	    interceptors.add(new CallServerInterceptor(//负责想服务器发送请求,从服务器读取请求
	        retryAndFollowUpInterceptor.isForWebSocket()));

	    Interceptor.Chain chain = new RealInterceptorChain(
	        interceptors, null, null, null, 0, originalRequest);
 	   return chain.proceed(originalRequest);
 	 }
           

既然是网络请求 就先从ConnctInterceptor开始,建立了一个HttpStream 分别对应Http1Code.http2Code(HTTP1.1/HTTP2)

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpStream, connection);
  }
           

发送数据和接收数据CallSeverInterceptor

@Override public Response intercept(Chain chain) throws IOException {
    HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpStream.writeRequestHeaders(request);

    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
    }

    httpStream.finishRequest();

    Response response = httpStream.readResponseHeaders()
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    if (!forWebSocket || response.code() != 101) {
      response = response.newBuilder()
          .body(httpStream.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    int code = response.code();
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
           

1。向服务器发送了Request Header

2.如果有Request body,就向服务器发送 3.读取Response body,构造一个Response

4.如果有Response body 就在3的基础上增加一个新的Response

回到new RealCall.excute第四个问题 通知dispatch执行完毕

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
           

这段代码调用了一个promoteCalls()方法  把队列中的AsyncCall 变成runnigAsyncCall 异步

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
           

同步请求就分析到这里 , 下面看 异步请求

OkhttpClient.newCall()。equeue() 还是同样的道理 newClaa,equeue = new RealCall().equeue

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
           

client.dispatcher().equeue(new AsyncCall(responseCalllback));

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
           

如果disparch线程池还能执行并发请求,那么就立即执行,否则加入readAsyncCall 然后执行完毕会执行promoteCalls() 把readAsyncCall变成异步请求

最后放一张 流程图

Okhttp3源码浅析

继续阅读