OkHttp

OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  try (Response response = client.newCall(request).execute()) {
    return response.body().string();
  }
}

client.newCall –> RealCall

#RealCall.kt#
override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart() // -->eventListener.callStart(this) 对整个请求的监听
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}

Dispatcher –> 用于线程调度

#Dispatcher#
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService //使用Executor完成线程调度
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
  }

dispatcher.enqueue

#Dispatcher#
internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    readyAsyncCalls.add(call) //加入双向队列,准备执行,为啥是准备,因为线程池可能满了需要等待(总请求数,正对某个主机的请求数)
    if (!call.call.forWebSocket) {//判断当前请求的主机是否有其他请求进行并记录
      val existingCall = findExistingCallWithHost(call.host)
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  promoteAndExecute()
}

promoteAndExecute()

#Dispatcher#  把符合条件的call推举出来并执行
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
                //筛选符合条件的call
        if (runningAsyncCalls.size >= this.maxRequests) break // 总上限64
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //单个主机上限5

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
        //执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

executeOn()

#RealCall#
fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        //这里实现了线程切换,this是外层的class AsyncCall: Runnable,找到内部run方法查看具体执行
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
#RealCall#
override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    timeout.enter()
    try {
      val response = getResponseWithInterceptorChain()//***发送请求拿到响应数据的过程
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)//回调
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
      } else {
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        canceledException.addSuppressed(t)
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      client.dispatcher.finished(this)
    }
  }
}
  1. 创建Call
  2. 在call的内部client.dispatcher.enqueue,作为参数把自己传入
  3. Dispatch内部先把call加入准备队列
  4. 对队列进行轮询,不超过限制的call进入开始队列
  5. 遍历开始队列,每一个call.executeOn
  6. call.executeOn内使用dispatch传来的executorService进行线程切换,并且使用chain开始进行请求