Okhttp3出来很久了,自己也用了一段时间了,之前一直都是生搬硬套的造轮子,今天根据网上的学习资料加上自己看了源码,把自己理解的部分记录下来。
OKhttp3特点
1.支持HTTP2/SADY 2.Socket选择最好的路线,并支持自动重连 3.拥有自动维护的Socket线程池,减少握手次数。 4.拥有队列线程池,轻松写并发 5.使用Interceptor处理请求响应(比如透明GZIP压缩) 6.基于Headers缓存策略
从网络请求入手
OkhttpClient.build 初始化
public OkHttpClient build() {
return new OkHttpClient(this);
}
OkhttpClient.newCall 其实是OkhttpClient实现了Call.Factory接口,
Call.Factory接口里面的newCall 方法实现
interface Factory {
Call newCall(Request request);
}
通过newCall又创建了一个新的Call , 而这个Call 是通过new RealCall来赋值的
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
RealCall 持有封装好的OkhttpClient,Requst,ps这里RetryAndFollowUpInterceptor先不做分析
protected RealCall(OkHttpClient client, Request originalRequest) {
this.client = client;
this.originalRequest = originalRequest;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client);
}
同步请求
OkhttpClient.newCall().excute() 这个 newCall().excute() = new RealCall().excute 因为newCall 是通过 new RealCall 来赋值的
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
这段代码说明几个问题 1.判断这个RealCall是否已经执行,Call只能被执行一次 否则就报IllegalStateExcption
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
2.把创建好的RealClaa交给线程池来执行
client.dispatcher().executed(this);
3.通过getResponseWithInterceptorChain()获得返回结果Response
Response result = getResponseWithInterceptorChain();
4.最后还要通知dispatch执行完毕
client.dispatcher().finished(this);
dispatcher 线程池
private int maxRequests = 64; //最大并发请求数次
private int maxRequestsPerHost = 5; //主机最大请求数次
private Runnable idleCallback;
/** Executes calls. Created lazily. */
private ExecutorService executorService; //线程池
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); //准备执行的请求
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 正在执行的异步请求,包含已经取消但未执行完的请求
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); //正在执行的同步请求,包含已经取消单未执行完的请求
ExecutorService初始化配置
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0,//并发线程数如果为0,空闲一段时间销毁所有线程
Integer.MAX_VALUE, //并发线程数最大值
60, //当线程数大于并发线程的时候
TimeUnit.SECONDS,//多余空闲线程的最大存活时间
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
然后在继续分析ResponseWithInterceptorChain()
private Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());//重置所有拦截器
interceptors.add(retryAndFollowUpInterceptor);//负责连接失败之后重新定向重连
interceptors.add(new BridgeInterceptor(client.cookieJar()));//负责把用户构造的请求转换成发送的到服务器的请求,把服务器返回的响应换成用户的友好的响应
interceptors.add(new CacheInterceptor(client.internalCache()));//负责读取缓存,更新缓存
interceptors.add(new ConnectInterceptor(client));//连接服务器
if (!retryAndFollowUpInterceptor.isForWebSocket()) {
interceptors.addAll(client.networkInterceptors());//配置OkhttpClient,设置newworkInterceptors
}
interceptors.add(new CallServerInterceptor(//负责想服务器发送请求,从服务器读取请求
retryAndFollowUpInterceptor.isForWebSocket()));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
既然是网络请求 就先从ConnctInterceptor开始,建立了一个HttpStream 分别对应Http1Code.http2Code(HTTP1.1/HTTP2)
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpStream, connection);
}
发送数据和接收数据CallSeverInterceptor
@Override public Response intercept(Chain chain) throws IOException {
HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
httpStream.writeRequestHeaders(request);
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
httpStream.finishRequest();
Response response = httpStream.readResponseHeaders()
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
if (!forWebSocket || response.code() != 101) {
response = response.newBuilder()
.body(httpStream.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
int code = response.code();
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
1。向服务器发送了Request Header
2.如果有Request body,就向服务器发送 3.读取Response body,构造一个Response
4.如果有Response body 就在3的基础上增加一个新的Response
回到new RealCall.excute第四个问题 通知dispatch执行完毕
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
这段代码调用了一个promoteCalls()方法 把队列中的AsyncCall 变成runnigAsyncCall 异步
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
同步请求就分析到这里 , 下面看 异步请求
OkhttpClient.newCall()。equeue() 还是同样的道理 newClaa,equeue = new RealCall().equeue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
client.dispatcher().equeue(new AsyncCall(responseCalllback));
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
如果disparch线程池还能执行并发请求,那么就立即执行,否则加入readAsyncCall 然后执行完毕会执行promoteCalls() 把readAsyncCall变成异步请求
最后放一张 流程图