前言 話說有一天,產品經理突然找到正在摸魚的你。 產品:『我們要加一個聚合搜索功能,當用戶在我們網站查詢一件商品時,我們分別從 A、B、C 三個網站上查詢這個信息,然後再把得到的結果返回給用戶』 你:『哦,就是寫個爬蟲,從 3 個網站上抓取數據是吧?』 產品:『呸,爬蟲是犯法的,這叫數據分析,怎麼樣 ...
前言
話說有一天,產品經理突然找到正在摸魚的你。
產品:『我們要加一個聚合搜索功能,當用戶在我們網站查詢一件商品時,我們分別從 A、B、C 三個網站上查詢這個信息,然後再把得到的結果返回給用戶』
你:『哦,就是寫個爬蟲,從 3 個網站上抓取數據是吧?』
產品:『呸,爬蟲是犯法的,這叫數據分析,怎麼樣,能實現吧?』
你:『可以』
產品:『好的,明天上線』
你:『。。。』
Code 1.0
你很快完成了開發,代碼如下:
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) {
// 查詢信息
String queryName = "java";
// 調用查詢介面
long startTime = System.currentTimeMillis();
List<String> result = queryInfoCode1(queryName);
System.out.println("耗時: " + (System.currentTimeMillis() - startTime));
System.out.println(result);
}
/**
* 聚合查詢信息 code 1
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode1(String queryName) {
List<String> resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* 查詢網站 A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* 查詢網站B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* 查詢網站C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
你運行了一下代碼,結果如下:
耗時: 8512
[webA, webB, webC]
我去,怎麼請求一下要8秒多?上線了,產品還不砍死我。
debug 了一下代碼,發現問題出在了請求的網站上:
/**
* 查詢網站 A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* 查詢網站B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* 查詢網站C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
網站 A、網站 B 因為年久失修,沒人維護,介面響應很慢,平均響應時間一個是 5秒,一個是 3秒(這裡使用 sleep 模擬)。網站 C 性能還可以,平均響應時間 0.5 秒。 而我們程式的執行時間就是 網站A 響應時間 + 網站 B 響應時間 + 網站 C 響應時間。
Code 2.0
好了,問題知道了,因為請求的網站太慢了,那麼如何解決呢?總不能打電話找他們把網站優化一下讓我爬吧。書上教導我們要先從自己身上找問題。先看看自己代碼哪裡可以優化。
一分析代碼發現,我們的代碼全是串列化, A 網站請求完,再請求 B 網站,B 網站請求完再請求 C 網站。突然想到提高效率的第一要義,提高代碼的並行率。為什麼要一個一個串列請求,而不是 A、B、C 三個網站一起請求呢,Java 的多線程很輕鬆就可以實現,代碼如下:
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 查詢信息
String queryName = "java";
// 調用查詢介面
long startTime = System.currentTimeMillis();
List<String> result = queryInfoCode2(queryName);
System.out.println("耗時: " + (System.currentTimeMillis() - startTime));
System.out.println(result);
}
/**
* 聚合查詢信息 code 1
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode1(String queryName) {
List<String> resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* 聚合查詢信息 code 2
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List<String> resultList = Lists.newArrayList();
// 創建3個線程的線程池
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// 創建任務的 feature
Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
// 得到任務結果
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
// 關閉線程池
pool.shutdown();
}
return resultList;
}
/**
* 查詢網站 A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* 查詢網站B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* 查詢網站C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
這裡的重點代碼如下:
/**
* 聚合查詢信息 code 2
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List<String> resultList = Lists.newArrayList();
// 創建3個線程的線程池
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// 創建任務的 feature
Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
// 得到任務結果
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
// 關閉線程池
pool.shutdown();
}
return resultList;
}
請求網站的代碼其實一行沒變,變的是我們調用請求方法的地方,把之前串列的代碼,變成了多線程的形式,而且還不是普通的多線程的形式,因為我們要在主線程獲得線程的結果,所以還要使用 Future 的形式。(這裡可以參考之前的文章【併發那些事】創建線程的三種方式)。
好的運行一下代碼,看看效果,結果如下:
耗時: 5058
[webA, webB, webC]
嗯,效果明顯,從 8 秒多下降到了 5 秒多,但是還是很長,沒法接受的長。做為一個有追求的程式員,還要去優化。我們分析一下,剛開始代碼是串列的,流程如下,總請求時間是三次請求的總時長。
然後我們優化了一下,把串列請求給並行化,流程如下:
因為是並行化,類似木桶效應,決定最長時間的因素,是你請求中最耗時的的那個操作,這裡是時間為 5 秒的請求 A 網站操作。
Code 3.0
其實分析到這裡,在不能優化 AB 網站的請求時間的前提下,已經很難優化了。但是方法總比困難多,我們的確沒辦法再去壓縮總請求時間,但是可以讓用戶體驗更好一點,這裡需要引入兩個技術一個是 Websocket,一個是 CompletionService。其中websocket 可以簡單的理解成服務端推送技術,就是不需要客戶端主動請求,而是通過服務端主動推送消息(ws 在本文中不是重點,會一筆帶過,具體實現可以參考前文【websocket】spring boot 集成 websocket 的四種方式),下麵我們直接上代碼
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 查詢信息
String queryName = "java";
// 調用查詢介面
long startTime = System.currentTimeMillis();
queryInfoCode3(queryName);
System.out.println("耗時: " + (System.currentTimeMillis() - startTime));
}
/**
* 聚合查詢信息 code 1
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode1(String queryName) {
List<String> resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* 聚合查詢信息 code 2
*
* @param queryName
* @return
*/
private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List<String> resultList = Lists.newArrayList();
// 創建3個線程的線程池
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// 創建任務的 feature
Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
// 得到任務結果
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
// 關閉線程池
pool.shutdown();
}
return resultList;
}
/**
* 聚合查詢信息 code 3
*
* @param queryName
* @return
*/
private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
// 開始時間
long startTime = System.currentTimeMillis();
// 創建 CompletionService
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
// 創建任務的 feature
executorCompletionService.submit(() -> searchWebA(queryName));
executorCompletionService.submit(() -> searchWebB(queryName));
executorCompletionService.submit(() -> searchWebC(queryName));
for (int i = 0; i < 3; i++) {
Future take = executorCompletionService.take();
System.out.println("獲得請求結果 -> " + take.get());
System.out.println("通過 ws 推送給客戶端,總共耗時" + (System.currentTimeMillis() - startTime));
}
}
/**
* 查詢網站 A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* 查詢網站B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* 查詢網站C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
核心代碼如下:
/**
* 聚合查詢信息 code 3
*
* @param queryName
* @return
*/
private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
// 開始時間
long startTime = System.currentTimeMillis();
// 創建 CompletionService
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
// 創建任務的 feature
executorCompletionService.submit(() -> searchWebA(queryName));
executorCompletionService.submit(() -> searchWebB(queryName));
executorCompletionService.submit(() -> searchWebC(queryName));
for (int i = 0; i < 3; i++) {
Future take = executorCompletionService.take();
System.out.println("獲得請求結果 -> " + take.get());
System.out.println("通過 ws 推送給客戶端,總共耗時" + (System.currentTimeMillis() - startTime));
}
}
先看執行結果:
獲得請求結果 -> webC
通過 ws 推送給客戶端,總共耗時561
獲得請求結果 -> webB
通過 ws 推送給客戶端,總共耗時3055
獲得請求結果 -> webA
通過 ws 推送給客戶端,總共耗時5060
耗時: 5060
我們來分析一下執行結果,首先總耗時時間還是 5 秒多沒變,但是我們不是等全部執行完再推送給客戶端,而是執行完一個就推送一個,並且發現了一個規律,最先推送的是請求最快的,然後是第二快的,最後推最慢的那一個。也就是說推送結果是有序的。給用戶的體驗就是點擊按鈕後,1秒內會展示網站 C 的數據,然後過了2秒又在原有基礎上又添加導示了網站 B 數據,又過了2秒,又增加展示了網站 A數據。 這種體驗要比用戶一直白屏 5 秒,然後一下返回所有數據要好的多。
是不是很神奇,這背後的功臣就是 CompletionService,他的源碼如下:
package java.util.concurrent;
/**
* A service that decouples the production of new asynchronous tasks
* from the consumption of the results of completed tasks. Producers
* {@code submit} tasks for execution. Consumers {@code take}
* completed tasks and process their results in the order they
* complete. A {@code CompletionService} can for example be used to
* manage asynchronous I/O, in which tasks that perform reads are
* submitted in one part of a program or system, and then acted upon
* in a different part of the program when the reads complete,
* possibly in a different order than they were requested.
*
* <p>Typically, a {@code CompletionService} relies on a separate
* {@link Executor} to actually execute the tasks, in which case the
* {@code CompletionService} only manages an internal completion
* queue. The {@link ExecutorCompletionService} class provides an
* implementation of this approach.
*
* <p>Memory consistency effects: Actions in a thread prior to
* submitting a task to a {@code CompletionService}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions taken by that task, which in turn <i>happen-before</i>
* actions following a successful return from the corresponding {@code take()}.
*/
public interface CompletionService<V> {
/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task. Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Callable<V> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
* and whose {@code get()} method will return the given
* result value upon completion
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Runnable task, V result);
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take() throws InterruptedException;
/**
* Retrieves and removes the Future representing the next
* completed task, or {@code null} if none are present.
*
* @return the Future representing the next completed task, or
* {@code null} if none are present
*/
Future<V> poll();
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if necessary up to the specified wait
* time if none are yet present.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the Future representing the next completed task or
* {@code null} if the specified waiting time elapses
* before one is present
* @throws InterruptedException if interrupted while waiting
*/
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
可以看到 CompletionService 方法,分別如下:
Future
submit(Callable task); submit 用於提交一個 Callable 對象,用於提交一個可以獲得結果的線程任務
Future
submit(Runnable task, V result); submit 用於提交一個 Runnable 對象及 result 對象,類似於上面的 submit,但是 runnable 的返回值 void 無法獲得線程的結果,所以添加了 result 用於做為參數的橋梁
Future
take() throws InterruptedException; take 用於取出最新的線程執行結果,註意這裡是阻塞的
Future
poll(); take 用於取出最新的線程執行結果,是非阻塞的,如果沒有結果就返回 null
Future
poll(long timeout, TimeUnit unit) throws InterruptedException; 同上,只是加了一個超時時間
另外,CompletionService 是介面,無法直接使用,通常使用他的實現類 ExecutorCompletionService,具體使用方法如上面的 demo。
可能看到這裡會很好奇 ExecutorCompletionService 實現原理,其實原理很簡單,他在內部維護了一個阻塞隊列,提交的任務,先執行完的先進入隊列,所以你通過 poll 或 take 獲得的肯定是最先執行完的任務結果。
其它
1. 項目代碼
因為篇幅有限,無法貼完所有代碼,如遇到問題可到github上查看源碼。
關於
歡迎關註我的個人公眾號 KIWI的碎碎念 ,關註後回覆 福利,海量學習內容免費分享!
歡迎關註我的個人公眾號 KIWI的碎碎念 ,關註後回覆 學習資料,海量學習內容直接分享!