Okhttp整体流程源码分析

Okhttp 使用

注意:这里分析的 Okhttp 的版本是 3.7.0 版本。

Okhttp 的使用分为 3 步。

  1. 创建 OkHttpClient 对象。
  2. 创建请求对象 Request 。
  3. 通过 OkHttpClient 来执行 Request。

GET 同步方式使用示例如下:

// 第一步,创建 OkHttpClient 对象
OkHttpClient client = new OkHttpClient();

// 第二步,创建请求对象 Request
Request request = new Request.Builder().url(GET_URL).build();

// 第三步,通过 OkHttpClient 来执行 Request
Response response = client.newCall(request).execute();

// 注意是  string() ,不是 toString()
System.out.println(response.body().string());

POST 同步方式使用示例如下:

MediaType JSON_TYPE = MediaType.parse("application/json; charset=utf-8")

    // 第一步,创建 OkHttpClient 对象
 OkHttpClient okHttpClient = new OkHttpClient();

 Blog blog = new Blog();
 blog.content = "新建的Blog";
 blog.title = "测试";
 blog.author = "怪盗kidou";

    // 这里请求对象用到的请求体,因为是 Post 请求
 RequestBody requestBody = RequestBody.create(JSON_TYPE, JSON.toJSONString(blog) );

 // 第二步,创建请求对象 Request
 Request request = new Request.Builder().url(POST_URL).post(requestBody).build();

 Response response = okHttpClient.newCall(request).execute();

 System.out.println(response.body().string());

GET 异步方式使用示例如下(POST 异步类似):

OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder().url(GET_URL).build();

// 用回调的方式获取对象,注意回调方法是在后台线程执行的,不是主线程
client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        // 注意是  string() ,不是 toString()
        System.out.println("当前的线程 == "+ Thread.currentThread().getName()+"  内容为:"+response.body().string());
    }
});

Okhttp 源码分析

OkHttpClient 类分析

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {

  final Dispatcher dispatcher;    // 任务分发器
  final Proxy proxy;
  final List<Protocol> protocols;
  final List<ConnectionSpec> connectionSpecs;
  final List<Interceptor> interceptors;
  final List<Interceptor> networkInterceptors;
  final EventListener.Factory eventListenerFactory;
  final ProxySelector proxySelector;
  final CookieJar cookieJar;
  final Cache cache;
  final InternalCache internalCache;
  final SocketFactory socketFactory;
  final SSLSocketFactory sslSocketFactory;
  final CertificateChainCleaner certificateChainCleaner;
  final HostnameVerifier hostnameVerifier;
  final CertificatePinner certificatePinner;
  final Authenticator proxyAuthenticator;
  final Authenticator authenticator;
  final ConnectionPool connectionPool;
  final Dns dns;
  final boolean followSslRedirects;
  final boolean followRedirects;
  final boolean retryOnConnectionFailure;
  final int connectTimeout;
  final int readTimeout;
  final int writeTimeout;
  final int pingInterval;

  public OkHttpClient() {
    // 创建默认的 Builder 内容
    this(new Builder());
  }
}

这个类的参数非常多,内部是通过 Builder 模式来设置数据的,该对象包装了很多功能模块并对外提供统一的 API,这是典型的外观设计模式。这里先知道有这样一个类和一些参数,具体作用的在后面再分析。后面的分析也是这样,先理解整体流程,再详细分析。

注意:官方推荐的使用方式是使用一个全局的 OkHttpClient 在多个类之间共享。因为每个 Client 都会有一个自己的连接池和线程池,复用 Client 可以减少资源的浪费。

Request 类分析

这个类封装了请求用到的基本元素(请求地址、 请求方法、 请求头、 请求体等)。

public final class Request {
  final HttpUrl url;    // 请求地址
  final String method;    // 方法类型
  final Headers headers;    // 请求头
  final RequestBody body;    // 请求体
  final Object tag;

    // 懒初始化(在获取时才初始化)
  private volatile CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }

  public Builder newBuilder() {
    return new Builder(this);
  }

   //当没有 Cache-Control 请求头时,才会为 Null
  public CacheControl cacheControl() {
    CacheControl result = cacheControl;
    // 在获取该对象时才初始化
    return result != null ? result : (cacheControl = CacheControl.parse(headers));
  }

}

由源码可知,该类也是应用了 Buidler 设计模式,里面的 RequestBody 对象用来保存请求体的内容 ,这是一个抽象类,里面有一些创建 RequestBody 实例的静态方法,比如:

   // 生成一个用于网络传输的请求体对象
public static RequestBody create(MediaType contentType, String content) {
  Charset charset = Util.UTF_8;
  if (contentType != null) {
    charset = contentType.charset();
    if (charset == null) {
      charset = Util.UTF_8;
      contentType = MediaType.parse(contentType + "; charset=utf-8");
    }
  }
  byte[] bytes = content.getBytes(charset);
  return create(contentType, bytes);
}

系统对该类有两个实现的子类,分别对应两种不同的 MIME 类型:

  • FormBody : “application/x-www-form-urlencoded” ,用于表单提交的。

  • MultipartBody : “multipart/“+xxx ,用于文件上传的。

Call 类分析

有了 OkHttpClientRequest 对象,接下来会调用 client.newCall(request) 方法:

  @Override
public Call newCall(Request request) {
  return new RealCall(this, request, false /* for web socket */);
}

这是接口实现的方法,返回了一个 RealCall 对象,并把当前对象和 Request 传入,所以可以很方便地使用这两个对象。实际上,就是通过 RealCall 来操作请求的,这是一个非常核心的类,整体的请求流程都是在这个类完成。 RealCall 类实现了 Call 接口,那么我们先分析一下 Call 接口类:

public interface Call extends Cloneable {      
    // 返回当前请求
  Request request();
    //同步执行请求
  Response execute() throws IOException;
    //异步执行请求
  void enqueue(Callback responseCallback);
    // 是否被执行
  boolean isExecuted();
    // 是否取消执行
  boolean isCanceled();
  Call clone();
  // 工厂接口,给 OkHttpClient 实现来生成 RealCall 对象的
  interface Factory {
    Call newCall(Request request);
  }
}

该类主要是规范请求相关的操作,我们来重点分析一下 Call 接口的实现类 RealCall

final class RealCall implements Call {
  final OkHttpClient client;
  final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
  final Request originalRequest;

  RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
    // 参数赋值
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    // 该对象是一个拦截器对象,处理请求失败的重试、重定向
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }
}

该类的构造函数主要是进行了参数赋值,然后创建了一个 RetryAndFollowUpInterceptor 对象 。接下来我们先分析同步使用的情况。

RealCall 同步请求逻辑分析

先上源码:

@Override public Response execute() throws IOException {
  synchronized (this) {
      // 1.每个 Call 对象只能使用一次
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  // 2.捕获这个请求的 StackTrace
  captureCallStackTrace();
  try {

      // 3.将请求任务添加到同步队列
    client.dispatcher().executed(this);
      // 4.开始调用 chain 的每一个 interceptor,并拿到结果 response
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
      // 5.执行完成,将任务从队列中移除
    client.dispatcher().finished(this);
  }
}

总体来说,分为 5 步:

  1. 先判断该请求是否已经在执行,是则抛异常,否则把状态设置为“执行中”。
  2. 追踪方法调用栈。
  3. 将请求任务添加到任务分发器的同步队列。
  4. 执行拦截器调用链,并拿到结果。
  5. 将任务从任务分发器的同步队列中移除。

第二步的 captureCallStackTrace() 方法如下:

private void captureCallStackTrace() {
   // 调用了 AndroidPlatform
  Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
  retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}

该方法主要是获取到一个方法调用栈对象,并把对象传入了 OkHttp 的第一个拦截器对象 RetryAndFollowUpInterceptor

public void setCallStackTrace(Object callStackTrace) {
  this.callStackTrace = callStackTrace;
}

第三步的 client.dispatcher().executed(this) 方法,先通过 dispatcher() 方法来获得 Dispatcher 对象:

public Dispatcher dispatcher() {
  return dispatcher;
}

那么这个 Dispatcher 是在哪里创建的呢?其实是在 OkHttpClientBuilder 方法里创建的,如下:

// OkHttpClient.java
public Builder() {
  dispatcher = new Dispatcher();
    ...
}

那么,我们分析一下 Dispatcher 类:

public final class Dispatcher {
    // 最大的请求任务数量
  private int maxRequests = 64;        
    //同一个主机的最大请求任务数量
  private int maxRequestsPerHost = 5;    
    // 没有可执行的请求任务时,即空闲时执行一些任务
  private Runnable idleCallback;    
  // 线程池,可以在构造中传入或执行异步请求时再初始化
  private ExecutorService executorService;
  // 存放待执行任务的的队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 存放异步请求任务的队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 存放同步请求任务的队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
}

同步执行的方法如下:

synchronized void executed(RealCall call) {
    // 将请求添加到队列中
  runningSyncCalls.add(call);
}

方法很简单,就是把当前运行的 RealCall 放进存放同步请求任务的队列中,然后调用 getResponseWithInterceptorChain() 拿到返回结果,这个方法非常重要,短短几行代码就实现了对请求的所有处理,它体现了 OkHttp 中一个很重要的核心设计——拦截器机制。同步请求先分析到这里,后面会重点分析拦截器机制。接下来执行的 Dispatherfinish 方法如下:

void finished(RealCall call) {
  // 注意第3个参数传入的是 false
  finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    // 每次 remove 后,执行 promoteCalls 来轮转(异步任务才需要轮转,同步只移除就行)
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }

  // 没有可执行的请求任务时会调用
  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

该方法在同步请求时,只是移除相关的请求任务,并没有什么特殊的操作。

RealCall 异步请求逻辑分析

异步请求跟同步请求有许多相似的流程,下面的分析涉及相同代码时不再贴出源码。异步请求时,会调用 RealCallenqueue 方法,如下:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
      // 任务是否正在请求
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
      // 捕获这个请求的 StackTrace
  captureCallStackTrace();
      // 将请求任务添加到异步队列
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

该方法需要传入一个回调对象,其他步骤跟同步方法类似,不过最后是调用了 Dispatherenqueue 方法 ,并传入了一个 AsyncCall 对象。

  // Dispather.java
synchronized void enqueue(AsyncCall call) {
    //如果正在执行的请求小于设定值即64,并且请求同一个主机的request小于设定值即5
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 将请求任务添加到异步请求队列中
    runningAsyncCalls.add(call);
      // 执行这个任务
    executorService().execute(call);
  } else {
      // 将请求任务添加到等请求队列中,等待调用时机
    readyAsyncCalls.add(call);
  }
}

根据源码和注释大家可以看到如果正在执行的异步请求小于 64 ,并且请求同一个主机小于 5 的时候就先往正在运行的队列里面添加这个 call ,然后用线程池去执行这个 call ,否则就把他放到等待队列里面。我们先重点看一下 executorService() 方法,这个方法返回一个 ExecutorService 对象:

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

其实可以通过 OkhttpClientbuidler 方法来设置自定义的 ExecutorService 对象,如果没有设置,则用上述方法来创建一个线程池,我们来分析一下它的几个参数。

  • 核心线程数 corePoolSize:保持在线程池中的线程数,由于为 0 ,因此任何线程空闲时都不会被保留。

  • 最大线程数 maximumPoolSize:线程池最大支持创建的线程数,这里指定了 Integer.MAX_VALUE

  • 线程存活时间 keepAliveTime:线程空闲后所能存活的时间,若超过该时间就会被回收。这里指定的时间为 60 个时间单位(60s),也就是说线程在空闲超过 60s 后就会被回收。

  • 时间单位 unit:签名的线程存活时间的单位,这里为 TimeUnit.SECONDS 。

  • 线程等待队列 workQueue:线程的等待队列,里面的元素会按序排队,依次执行,这里和指定的是 SynchronousQueue

  • 线程工厂 threadFactory:线程的创建工厂这里传入的是 Util.threadFactory 方法创建的线程工厂。

疑问一:为什么要采用 SynchronousQueue ?

首先我们先需要了解一下什么是 SynchronousQueue ,它虽然是一个队列,但它内部不存在任何的容器,它采用了一种经典的生产者——消费者模型,它有多个生产者和消费者,当一个生产线程进行生产操作(put)时,若没有消费者线程进行消费(take),那么该线程会阻塞,直到有消费者进行消费。也就是说,它仅仅实现了一个传递的操作,这种传递功能由于没有了中间的放入容器,再从容器中取出的过程,因此是一种快速传递元素的方式,这对于我们网络请求这种高频请求来说,是十分合适的。

疑问二:为什么线程池线程数量不设上限 ?

虽然线程池没设置数量上限,实际上线程数量是在 Dispather 中进行维护的,上述分析过程中有提到最大请求任务数量和同一个主机最大的请求数量,其实就是在控制线程的数量,不会让线程无限地创建。

上面我们讲到,会通过线程池来执行 AsyncCall 对象,很显然这是一个 Runnable 对象,源码如下:

  // RealCall.java
final class AsyncCall extends NamedRunnable {
  private final Callback responseCallback;

  AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
  }       
}

AsyncCall 类其实是 RealCall 的内部类,然后继承于 NamedRunnable 对象,NamedRunnable 是一个抽象类,源码如下:

// 该类主要是给执行该任务的线程命名
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
      // 设置了当前线程的name
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
        // 给子类回调该抽象方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

那么接下来肯定是执行 AsyncCallexecute 方法:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
          // 开始调用 chain 的每一个 interceptor,并拿到结果 response
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          //回调,注意这里回调是在线程池中,不是主线程
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          // 进行 callback 的回调,不是主线程
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
      // 执行完成,将任务从队列中移除
        client.dispatcher().finished(this);
      }
    }

该方法跟同步执行时一样,先调用 getResponseWithInterceptorChain() 来获取响应结果,然后通过回调方法返回,最后执行 Dispatcherfinished 方法,由于 getResponseWithInterceptorChain() 涉及到拦截器机制,这里先分析异步请求时的 finish 方法:

void finished(AsyncCall call) {
  finished(runningAsyncCalls, call, true);
}

该方法跟同步时传的入参有点不一样,这里第 3 个参数传入了 true ,由上面分析同步时的源码可知,如果是 true ,那么将执行 promoteCalls() 方法。

  // 负责ready的Call到running的Call的转化
private void promoteCalls() {
//如果当前执行的线程大于maxRequests(64),则不操作
  if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
  // 没有待执行的任务了
  if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();

    if (runningCallsForHost(call) < maxRequestsPerHost) {
        // 从等待队列中移除掉
      i.remove();
      // 添加到存放异步请求任务的队列
      runningAsyncCalls.add(call);
        // 通过线程池来执行
      executorService().execute(call);
    }

    // 达到最大的容量了
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
} 

这个方法的作用是: 如果条件满足,那么就将等待执行的任务从等待队列中移除,然后放进正在执行的队列中,最后通过线程池执行。

拦截器机制

OkHttp 的网络请求的过程就是依赖于各种拦截器(Interceptor)实现的,我们先看看 Interceptor 的定义:

public interface Interceptor {
    // 负责拦截
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    // 负责分发、前行
    Response proceed(Request request) throws IOException;

    Connection connection();
  }
}

Interceptor 实际上是一个接口,里面只有一个方法 intercept 以及一个内部接口 Chain

上述分析中讲到,无论是同步还是异步请求,最终会通过 getResponseWithInterceptorChain() 方法来获取结果的,源码如下:

Response getResponseWithInterceptorChain() throws IOException {

  List<Interceptor> interceptors = new ArrayList<>();
  // 添加开发者自定义的 interceptors
  interceptors.addAll(client.interceptors());
  // 处理请求失败的重试、重定向
  interceptors.add(retryAndFollowUpInterceptor);
  // 添加一些请求的头部或其他信息,并对返回的 response 做一些友好的处理
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  // 判断缓存是否存在、读取缓存、更新缓存
  interceptors.add(new CacheInterceptor(client.internalCache()));
  // 建立客户端和服务端的连接
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
      // 添加开发者自定义的网络层拦截器
    interceptors.addAll(client.networkInterceptors());
  }

  // 负责向服务器发送请求数据、从服务器读取响应数据
  // 这个 interceptor 不会再递归调用了,面是直接返回 response 给上一个 interceptor
  interceptors.add(new CallServerInterceptor(forWebSocket));

  // 一个包装类,包含 interceptors 和 request
  Interceptor.Chain chain = new RealInterceptorChain(
      interceptors, null, null, null, 0, originalRequest);

  // 开始调用 chain ,默认从0索引开始,即从第一个 interceptor 开始
  return chain.proceed(originalRequest);
}

该方法就是不断地添加各种 Interceptor ,然后创建了一个 RealInterceptorChain 对象,并把 Interceptor 列表传入,最后执行 chain.proceed(originalRequest) 方法。

注意:这里添加 Interceptor 的顺序非常重要,首先添加的是开发者自定义的各种 Interceptor ,然后才添加系统的 RetryAndFollowUpInterceptor 对象,具体看注释说明。

接下来,我们重点看一下 RealInterceptorChain 的方法:

@Override public Response proceed(Request request) throws IOException {
  return proceed(request, streamAllocation, httpCodec, connection);
}

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {

  // 记录创建实例的个数
  calls++;

  // 创建一个 RealInterceptorChain 实例   ,并将索引加1,即调用下一个 interceptor
  // 这里面是一个递归调用,分别遍历调用了所有的 interceptor ,然后从最后一个开始返回 response
  RealInterceptorChain next = new RealInterceptorChain(
      interceptors, streamAllocation, httpCodec, connection, index + 1, request);
  // 取出下一个 interceptor
  Interceptor interceptor = interceptors.get(index);
  // 执行intercept方法,拦截器又会调用proceed()方法
  Response response = interceptor.intercept(next);

  return response;
}

proceed 方法里会再次创建一个 RealInterceptorChain 对象,但是传入的 index 参数会加 1 ,这个参数用来获取当前需要使用的拦截器,然后获取对应索引的拦截器,默认索引从 0 开始,最后执行拦截器的 intercept 方法,并把新的 RealInterceptorChain 对象传入,如果不是最后一个拦截器,则在拦截器里面又会调用proceed() 方法,通过这种递归的方式来遍历执行所有的拦截器。

实际上,intercept 方法往往是如下的结构:

@Override 
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    // Request阶段,该拦截器在Request阶段负责做的事情

    // 调用RealInterceptorChain.proceed(),其实是在递归调用下一个拦截器的intercept()方法
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

    // Response阶段,完成了该拦截器在Response阶段负责做的事情,然后返回到上一层的拦截器。
    return response;     
}

上面简单的三行代码将整个 intercept 过程分为了两个阶段:

  • Request 阶段:执行一些该拦截器在 Request 阶段所负责的事情。

  • Response 阶段:完成该拦截器在 Response 阶段所负责的事情。

这其实是采用了一种递归的设计,将 OkHttp 的请求分为了几个阶段,分别代表了不同的拦截器,不同拦截器分别会在这个递归的过程中有两次对该请求的处理的可能,一次是在 Request 之前,一次是在 Response 之后,中间的过程中若出现了错误,则通过抛出异常来通知上层。

其拦截器的整体执行流程如下:

框架的整体请求流程如下: