HTTP是我們交換數據和媒體流的現代應用網路,有效利用HTTP可以使我們節省帶寬和更快地載入數據,Square公司開源的OkHttp網路請求是有效率的HTTP客戶端。之前的知識面僅限於框架API的調用,接觸到實際的工作之後深知自己知識的不足,故而深挖框架源碼儘力吸取前輩的設計經驗。關於此框架的源碼解 ...
HTTP是我們交換數據和媒體流的現代應用網路,有效利用HTTP可以使我們節省帶寬和更快地載入數據,Square公司開源的OkHttp網路請求是有效率的HTTP客戶端。之前的知識面僅限於框架API的調用,接觸到實際的工作之後深知自己知識的不足,故而深挖框架源碼儘力吸取前輩的設計經驗。關於此框架的源碼解析網上的教程多不勝數,此文名為源碼解析,實則是炒冷飯之作,如有錯誤和不足之處還望各位看官指出。
攔截器
攔截器是OkHttp框架設計的精髓所在,攔截器所定義的是Request的所通過的責任鏈而不管Request的具體執行過程,並且可以讓開發人員自定義自己的攔截器功能並且插入到責任鏈中
用戶自定義的攔截器位於 OkHttpClient.addInterceptor() 添加到interceptors責任鏈中
RealCall.execute()執行的時候調用RealCall.getResponseWithInterceptorChain()將 來自 OkHttpClient的interceptors以及預設的攔截器一併加入到RealInterceptorChain責任鏈中並調用, 代碼並沒有對originalRequest進行封裝, InterceptorChain和originalRequest一併流轉到 RealInterceptorChain類中處理
CustomInterceptor RetryAndFollowUpInterceptor BridgeInterceptor CacheInterceptor ConnectInterceptor NetworkInterceptors CallServiceInterceptor
RealInterceptorChain.proceed()
EventListener.callStart()也是在RealCall.execute()嵌入到Request調用過程, EventListener.callEnd()位於StreamAllocation中調用
Request.Builder
- url (String/URL/HttpUrl)
- header
- CacheControl
- Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)
BridgeInterceptor
Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.
此攔截器是應用碼到網路碼的橋接。它會將用戶請求封裝成一個網路請求並且執行請求,同時它還完成從網路響應到用戶響應的轉化. 最後Chain.proceed() 方法啟動攔截器責任鏈, RealInterceptorChain中通過遞歸調用將網路請求以及響應的任務分別分配到各個攔截器中, 然後通過ResponseBuilder.build()方法將網路響應封裝, 然後遞歸調用責任鏈模式使得調用以及Response處理的過程可以一併寫入BridgeInterceptor中
public final class RealInterceptorChain implements Interceptor.Chain {
public Response proceed(Request request, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors,
streamAllocation, httpCodec,connection, index + 1, request, call,
eventListener, connectTimeout, readTimeout,writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
}
}
CallServiceInterceptor
Interceptor的邏輯均在intercept()方法中實現, 在通過Chain實體類獲取到請求主題之後,通過BufferedSink介面將請求轉發到Okio介面,在攔截過程中通過EventListener介面將攔截器處理狀態(主要是RequestBodyStart和RequestBodyEnd兩個狀態)發送出去
public final class CallServiceInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
}
}
CacheInterceptor
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
/**
* Track an HTTP response being satisfied with {@code cacheStrategy}.
* 主要是跟蹤networkRequest次數以及對應Cache的hitcount
*/
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//在chain.proceed()調用下一個攔截器
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//處理response並返回
...
return response;
}
}
CacheStrategy
OkHttpClient
OkHttpClient托管著所有HTTP調用, 每個Client均擁有自己的連接池和線程池
- 實現抽象類Internal的方法,這是Internel抽象類唯一的實現,方法與CacheInterceptor控制Http的Header.Lenient區域和StreamAlloction從連接池中獲取連接有關
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
...
synchronized (connectionPool) {
...
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
return result;
}
- RouteDatabase && RouteSeletor
RouteDatabase是記錄連接失敗的連接路徑的黑名單,從而OkHttp可以從失敗中學習並且傾向於選擇其他可用的路徑,RouteSeletor通過RouteDatabase.shouldPostpone(route)方法可獲知此路徑是否近期曾連接失敗,RouteSelector部分源碼如下:
public final class RouteSelector {
/**
* Clients should invoke this method when they encounter a connectivity failure on a connection
* returned by this route selector.
* 在StreamAllocation.streamFailed()中添加了routeSelector.connectFailed()邏輯
*/
public void connectFailed(Route failedRoute, IOException failure) {
if (failedRoute.proxy().type() != Proxy.Type.DIRECT && address.proxySelector() != null) {
// Tell the proxy selector when we fail to connect on a fresh connection.
address.proxySelector().connectFailed(
address.url().uri(), failedRoute.proxy().address(), failure);
}
routeDatabase.failed(failedRoute);
}
}
Dispatcher
Dispatcher(分離器或者復用器)是非同步網路請求調用時執行的策略方法, 復用器的概念十分常見,它主要的作用是輸入的各路信號進行捲積運算,最大可能壓榨通信的帶寬,提高信息傳輸的效率。Dispatcher控制最大請求併發數和單個主機的最大併發數,並持有一個線程池負責執行非同步請求,對同步的請求只是用作統計。OkHttp在每個分離器使用一個ExecutorService內部調用請求, Dispatcher內部主要並不涉及執行的具體。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
...
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
ExecutorSevice.execute(AsyncCall)執行代碼位於AsyncCall內部覆寫的execute()方法, 方法內定義一些Callback回調節點運行邏輯,包括用戶主動取消執行(使用retryAndFollowUpInterceptor)以及執行請求成功或者失敗時的回調方法
final class AsyncCall extends NamedRunnable {
...
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
- 惰性初始模式(Created Lazily)成員
- ExecutorService()
- CacheControl
WebSocket
WebSocket 非同步非堵塞的web socket介面 (通過Enqueue方法來實現)
OkHttpClient 通過實現 WebSocket.Factory.newWebSocket 介面實現工廠構造, 通常是由 OkHttpClient來構造
- WebSocket生命周期:
- Connecting狀態: 每個websocket的初始狀態, 此時Message可能位於入隊狀態但是還沒有被Dispatcher處理
- Open狀態: WebSocket已經被伺服器端接受並且Socket位於完全開放狀態, 所有Message入隊之後會即刻被處理
- Closing狀態: WebSocket進入優雅的關閉狀態,WebSocket繼續處理已入隊的Message但拒絕新的Message入隊
- Closed狀態: WebSocket已完成收發Message的過程, 進入完全關閉狀態
WebSocket受到網路等各種因素影響, 可能會斷路而提前進入關閉流程 - Canceled狀態: 被動WebSocket失敗連接為非優雅的過程, 而主動則是優雅短路過程
RealWebSocket
RealWebSocket管理著Request隊列內容所占的空間大小以及關閉Socket之後留給優雅關閉的時間,預設為16M和60秒,在RealWebSocket.connect()方法中RealWebSocket對OkHttpClient以及Request封裝成Call的形式,然後通過Call.enqueue()方法定義調用成功和失敗時的Callback代碼public void connect(OkHttpClient client) { client = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build(); final Request request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .build(); call = Internal.instance.newWebSocketCall(client, request); call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { checkResponse(response); } catch (ProtocolException e) { failWebSocket(e, response); closeQuietly(response); return; } // Promote the HTTP streams into web socket streams. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); streamAllocation.noNewStreams(); // Prevent connection pooling! Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); // Process all web socket messages. try { listener.onOpen(RealWebSocket.this, response); String name = "OkHttp WebSocket " + request.url().redact(); initReaderAndWriter(name, streams); streamAllocation.connection().socket().setSoTimeout(0); loopReader(); } catch (Exception e) { failWebSocket(e, null); } } @Override public void onFailure(Call call, IOException e) { failWebSocket(e, null); } }); }
當Call請求被服務端響應的時候就將HTTP流導入到Web Socket流中,並且調用WebSocketListener相對應的狀態方法, WebSocketListener狀態如下:
onOpen()
onMessage()
onClosing()
onClosed()
onFailure()
- WebSocket -> RealWebSocket
- Connection -> RealConnection
- Interceptor -> RealInterceptorChain
- Call -> RealCall
- ResponseBody -> RealResponseBody
Gzip壓縮機制
處理Gzip壓縮的代碼在BridgeInterceptor中,預設情況下為gzip壓縮狀態,可以從下麵的源碼片段中獲知。如果header中沒有Accept-Encoding
,預設自動添加 ,且標記變數transparentGzip
為true
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
BridgeInterceptor解壓縮的過程調用了okio.GzipSource()方法並調用Okio.buffer()緩存解壓過程,源碼如下
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
RealCall構造方法
在RealCall構造方法上面,早期版本的RealCall構造方法中將EventListener.Factory以及EventListenerFactory.Create()分開處理導致RealCall構造方法非線程安全. 現在版本的RealCall的構造函數使用OkHttpClient.eventListenerFactory().create()
早期版本如下:
final class RealCall implements Call {
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
...
final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
//重試和跟進攔截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
// 這是不安全的發佈,不是線程安全的。
this.eventListener = eventListenerFactory.create(this);
}
}
現在 OkHttp 3.11.0 的RealCall源代碼如下
final class RealCall implements Call {
private EventListener eventListener;
...
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
}
ConnetionPool
連接池能夠復用http連接從而減少訪問相同目標主機情況下的網路延遲,此類實現管理連接開閉的策略並使用與連接池一一對應的後臺線程清理過期的連接。ConnectionPool提供對Deque<RealConnection>
進行操作的方法分別為put、get、connectionBecameIdle和evictAll幾個操作。分別對應放入連接、獲取連接、移除連接和移除所有連接操作,這裡我們舉例put和get操作。
public final class ConnectionPool {
...
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
...
}
cleanUpRunnable裡面是一個while(true),一個迴圈包括:
- 調用一次cleanUp方法進行清理並返回一個long
- 如果是-1則退出,否則調用wait方法等待這個long值的時間
okhttp是根據StreamAllocation引用計數是否為0來實現自動回收連接的。cleanUpRunnable
遍歷每一個RealConnection,通過引用數目確定哪些是空閑的,哪些是在使用中,同時找到空閑時間最長的RealConnection。如果空閑數目超過最大空閑數或者空閑時間超過最大空閑時間,則清理掉這個RealConnection並返回0,表示需要立刻再次清理
public final class ConnectionPool {
...
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
...
}
我們在put操作前首先要調用executor.execute(cleanupRunnable)
來清理閑置的線程。
RealConnection
RealConnection是socket物理連接的包裝,它裡面維護了List<Reference<StreamAllocation>>
的引用。List中StreamAllocation的數量也就是socket被引用的計數,如果計數為0的話,說明此連接沒有被使用就是空閑的,需要被回收;如果計數不為0,則表示上層代碼仍然引用,就不需要關閉連接。