OkHttp总结

介绍

一次完整的HTTP请求流程

请求流程

线程限制

能否无限制创建线程?虽然没有限制,但是创建每个线程都是需要内存的。jdk在1.5以下,创建一个线程大约需要200k左右;jdk在1.5以上,创建一个线程大约需要1m左右。
okhttp的限制呢?
同时正在进行的异步请求:默认最大64个;
同一域名同时请求存在:默认最大5个;
看以下代码图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}

但是这个数字是可以改的。

分发器

线程池:这个地方创建线程池用的是SynchronousQueue:可以自己试一下,普通的线程是不会同时执行的,但是这个SynchronousQueue创建的线程池是可以同时执行多个请求的。

1
2
3
4
5
6
7
8
9
10
11
private var executorServiceOrNull: ExecutorService? = null

@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}

请求队列:

1
2
3
4
5
6
7
8
9
10
11
//异步:准备请求的队列
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

//异步:正在请求的队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()

// 同步:正在请求的队列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()

val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 分发器功能主要在这里:首先判断请求队列大小是否超过最大请求数,再判断统一域名是否超过最大同一域名的请求数量。
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 没有超过,把准备执行队列里的请求移动正在执行的请求队列,同时准备队列里删除。
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}

// 执行run
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}

return isRunning
}

run方法中,执行getResponseWithInterceptorChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}

每执行一个请求,正在请求的队列则-1=63,如果是同一域名的请求也减1=4,同时在ready的队列则释放一个到running队列里。看下图:

拦截器


Interceptor拦截器设置的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//自定义应用拦截器位置
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
// 自定义网络拦截器位置
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}


桥接拦截器的设置代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()

val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}

val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkRequest = requestBuilder.build()
val networkResponse = chain.proceed(networkRequest)

cookieJar.receiveHeaders(networkRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(networkRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}


参考资源

OkHttp
来自享学直播