OkHttp源码
OkHttp
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
client.newCall –> RealCall
#RealCall.kt#
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart() // -->eventListener.callStart(this) 对整个请求的监听
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
Dispatcher –> 用于线程调度
#Dispatcher#
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService //使用Executor完成线程调度
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
dispatcher.enqueue
#Dispatcher#
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call) //加入双向队列,准备执行,为啥是准备,因为线程池可能满了需要等待(总请求数,正对某个主机的请求数)
if (!call.call.forWebSocket) {//判断当前请求的主机是否有其他请求进行并记录
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
promoteAndExecute()
#Dispatcher# 把符合条件的call推举出来并执行
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//筛选符合条件的call
if (runningAsyncCalls.size >= this.maxRequests) break // 总上限64
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //单个主机上限5
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
//执行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
executeOn()
#RealCall#
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//这里实现了线程切换,this是外层的class AsyncCall: Runnable,找到内部run方法查看具体执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
#RealCall#
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()//***发送请求拿到响应数据的过程
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)//回调
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
- 创建Call
- 在call的内部client.dispatcher.enqueue,作为参数把自己传入
- Dispatch内部先把call加入准备队列
- 对队列进行轮询,不超过限制的call进入开始队列
- 遍历开始队列,每一个call.executeOn
- call.executeOn内使用dispatch传来的executorService进行线程切换,并且使用chain开始进行请求
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 炎武的学习笔记!