Executor Framework

来源:https://www.cnblogs.com/nativestack/archive/2018/09/30/java-executor.html
-Advertisement-
Play Games

Why? look at the following 2 pieces of code for implementing a simple web server based on socket, can you point out the problems(I put them in the com ...


Why?

look at the following 2 pieces of code for implementing a simple web server based on socket, can you point out the problems(I put them in the comments)?

/*
* Single thread. bad response time and throughout(CPU idle). Think about how it will block(read from/ write to socket, perform I/O operation
* like File system , DB, Network...)
*
*/
class
SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); handleRequest(connection); } } } /////////////////////////////////////////////////////////////
/*
* Multiple threads. will get good response time and throughout.
* Problems:
* 1. Creating a thread for each request. Creating thread will consume resource(cpu time / memory).
* 2. one Thead is only for one request.(not reused).
* 3. in a heavy concurrent load environment, JVM will have pressure for GC or even get OOM exception.
*/
class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
            }
        }
}

Java Executor Framework

The framework has a flexible design to run tasks asynchronously, decouple the task submission and execution based on producer/consumer pattern, provide mechanisms to manage the lifecycle of the service and tasks.

The Executor Interface

It is simple with only one method signature below:

public interface Executor { 
    void execute(Runnable command);
}

Task/command can run in a new thread, in a thread pool or in the calling thread depending on the implementations of the Executor. For example:

public class MyExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start(); // run in a new thread
        r.run(); //run in the calling thread.
    };
}

Runnable & Callable

They both are interfaces in different packages. But Callable's call method has a return value and throws a checked Exception. See checked exception and runtime exception here which is interesting. Task usually needs a return value like query database, perform a calculation. The Executor interface provides a very basic task submission where a task is a Runnable implementation. So, if you do not care about the result and exception handling, just use Executor.execute(Runnable). Regarding how Callable is called in a thread, we will talk later in this page.

The ExecutorService Interface

JVM cannot exit until all the nondaemon threads have terminated. So, failing to shut down the executor could prevent JVM from exiting.

Executor runs task asynchronously, at any given time the state of previously submitted tasks is not immediately obvious. Some may have finished, some may be concurrently running and some may be in the queue awaiting execution. Since Executors(those implements) provide a service to the application, they should be able to be shut down as well, both gracefully(like do not accept new tasks) and abruptly(like power off) and feedback application with information about the status of the tasks that affected by the shutdown. 

The interface of this mechanism is ExecutorService which extends the Executor interface, adding some new methods to provide lifecycle management for the executor/service and methods to facilitate the task submission. For example, you can use ExecutorService.submit(Runnable) if you do not want a return result or use ExecutorService.submit(Runnable,T) to get a result if the runnable is completed successfully. 

"An Executor that provides methods to manage termination and methods that can produce a Future for tracking the progress of one or more asynchronous tasks."

The shutdown method will allow previously submitted tasks to execute before terminating, while the shutdownNow method prevents waiting tasks from starting and attempts to stop currently executing tasks. The executing tasks can be terminated/stopped if it wants to be by properly handling the InterruptedException which is caused by the(another) executor who interrupts/cancels it(invoking the the interrupt method of the target thread).

Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.   

Method submit extends base method Executor.execute(Runnable) by creating and returning a Future that can be used to cancel execution and/or wait for completion. Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService can be used to write customized variants of these methods.) 

Future, FutureTask

Future as an interface represents the lifecycle of a task and provides methods to test whether the task has completed or been canceled, retrieve its result, and cancel the task.

cancel():

Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel is called, this task should never run(the task, in this case, is not picked from the queue). If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task. If it is true and the task properly handles the InterruptException for cancelling,  the cancel will be sucessful. If it is false, the in-progress tasks are allowed to complete.

After this method returns, subsequent calls to isDone will always return true. Subsequent calls to isCancelled will always return true if this method returned true.

In the Executor framework, tasks that have been submitted but not yet started can always be cancelled, and tasks that have started can sometimes be cancelled if they are responsive to interruption.Cancelling a task that has already completed has no effect. 

get():

throws InterruptedException, ExecutionException,CancellationException. The another version also throws TimeoutException

Waits(blocked) if necessary for the computation to complete, and then retrieves its result. It throws checked exception. 
ExecutionException is the wrapper of all other exceptions including runtime exception, customized exception, program error..., use e.getCause to get the specific exception.

IsCanclelled: 

Returns true if this task was cancelled before it completed normally.

IsDone:

Returns true if this task completed.Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

 1 public interface Callable < V > {
 2     V call() throws Exception;
 3 }
 4 public interface Future < V > {
 5     boolean cancel(boolean mayInterruptIfRunning);
 6     boolean isCancelled();
 7     boolean isDone();
 8     V get() throws InterruptedException,ExecutionException,CancellationException;
11     V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException;
16 }

Here is a question, How Callable as a task run in thread(consumer) in ExecutorService ? As we know, the java Thread Model needs a Runnable (target) implementing the run method, how this work? When submitting a task by ExecutorService.submit(Callable<T>), in the code(AbstractExecutorService), it wrapper the Callable into a FutureTask class in newTaskFor(Callable) method. FutureTask implements the RunnableFuture. As the name tells us, RunnableFuture implements both Runnable and Future.  So, in the run method of FutureTask, it invokes the Callable.call() method and set the result of the task(FutureTask). 

run method in class FutureTask and the wrapper 

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

Execution policy and Thread pool

Because the framework decouples the task submission and execution, it support the execution policy which specify the "what, where, when and how" of task execution,including:

• In what thread will tasks be executed?
• In what order should tasks be executed (FIFO, LIFO, priority order)?
• How many tasks may execute concurrently?
• How many tasks may be queued pending execution?
• If a task has to be rejected because the system is overloaded, which task should be selected as the victim, and how should the application be notified?
• What actions should be taken before or after executing a task?

You can use execution policy to match your system resource.

Thread pool, uses a work queue to hold tasks waiting to be executed. The work thread takes next task from the queue, execute it, then back to the pool for another task.

Below is the new version of the web server demo using thread pool. What's the benefits?

1) Reused threads can save the time to create/destroy thread repeatedly, increasing response time.

2) You can have the proper size of the threads to keep cpu busy but not too many of them.

 1 class TaskExecutionWebServer {
 2     private static final int NTHREADS = 100;
 3     private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
 4     public static void main(String[] args) throws IOException {
 5         ServerSocket socket = new ServerSocket(80);
 6         while (true) {
 7             final Socket connection = socket.accept();
 8             Runnable task = new Runnable() {
 9                 public void run() {
10                     handleRequest(connection);
11                 }
12             };
13             exec.execute(task);
14         }
15     }
16 }

You can create a thread pool by calling one of the static factory methods in Executors provide by the java library:

  • newFixedThreadPool
    "Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks.If additional tasks are submitted when all threads are active,they will wait in the queue until a thread is available.If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown."
  • newCachedThreadPool
    Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters)may be created using ThreadPoolExecutor constructors.
  • newSingleThreadExecutor
    A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order).

  • newScheduledThreadPool
    Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically, similar to Timer.

You can also create your custom pool with work queue to implement FIFO,LIFO, priority order. For the web server example, using the pool-based policy will no longer fail under heavy load as it does not create too many threads. But as the work queue is unbounded, if the speed of task execution is much slower than the task comes(producer is much faster than consumer), it can still fail. We can add a bounded work queue to the pool if needed. 

Examples/Applications

  • Shutdown the ExecutorService. Below is the logic of in what situation task will be rejected 

 1 class LifecycleWebServer {
 2     private final ExecutorService exec = ...;
 3     public void start() throws IOException {
 4         ServerSocket socket = new ServerSocket(80);
 5         while (!exec.isShutdown()) {
 6             try {
 7                 final Socket conn = socket.accept();
 8                 exec.execute(new Runnable() {
 9                     public void run() {
10                         handleRequest(conn);
11                     }
12                 });
13             } catch (RejectedExecutionException e) {
14                 //when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, 
//and is saturated.
15 if (!exec.isShutdown()) 16 log("task submission rejected", e); 17 } 18 } 19 } 20 public void stop() { 21 exec.shutdown(); 22 } 23 void handleRequest(Socket connection) { 24 Request req = readRequest(connection); 25 if (isShutdownRequest(req)) 26 stop(); 27 else 28 dispatchRequest(req); 29 } 30 }
  • Delayed and periodic tasks
    Use ScheduledThreadPoolExecutor replace Time/TimerTask class. If you need to build your own scheduling service, you may still be able to take advantage of the library by using a DelayQueue, a BlockingQueue implementation that provides the scheduling functionality of ScheduledThreadPoolExecutor. A DelayQueue manages a collection of Delayed objects. A Delayed has a delay time associated with it: DelayQueue lets you take an element only if its delay has expired. Objects are returned from a DelayQueue ordered by the time associated with their delay
    See demo here
  • ExecutorCompletionService
    What if you submit many tasks and need to get the result immediately if available.  Using get method, to repeatedly poll the result with timeout zero is not a good idea. CompletionService combines the Executor and BlockingQueue.  It delegates the task execution to Executor, put the Future to the BlockingQueue once it is done by overriding the done() method of Future/FutureTask. You can use the service.take() method to get the Future(finished) from the queue once there is one available and then call get method for the result. So, in CompletionService, it will wrapper task as QueueingFuture.
    1 for (int t = 0, n = info.size(); t < n; t++) {
    2     Future < ImageData > f = completionService.take();
    3     ImageData imageData = f.get();
    4     renderImage(imageData);
    5 }

    By calling completionService.take(), you get the result soon once any of the tasks is completed.

  • Placing time limits on tasks. Sometimes, we want to cancel tasks due to a timeout setting. 
     1 Page renderPageWithAd() throws InterruptedException {
     2     long endNanos = System.nanoTime() + TIME_BUDGET;
     3     Future < Ad > f = exec.submit(new FetchAdTask());
     4     // Render the page while waiting for the ad
     5     Page page = renderPageBody();
     6     Ad ad;
     7     try {
     8         // Only wait for the remaining time budget
     9         long timeLeft = endNanos - System.nanoTime();
    10         ad = f.get(timeLeft, NANOSECONDS);
    11     } catch (ExecutionException e) {
    12         ad = DEFAULT_AD;
    13     } catch (TimeoutException e) {
    14         ad = DEFAULT_AD;
    15         f.cancel(true);
    16     }
    17     page.setAd(ad);
    18     return page;
    19 }

    If you have a list of tasks, you can use a more convenient way of following:

     1 private class QuoteTask implements Callable < TravelQuote > {
     2     private final TravelCompany company;
     3     private final TravelInfo travelInfo;
     4     ...
     5     public TravelQuote call() throws Exception {
     6         return company.solicitQuote(travelInfo);
     7     }
     8 }
     9 public List < TravelQuote > getRankedTravelQuotes(TravelInfo travelInfo, Set < TravelCompany > companies,
    11     Comparator < TravelQuote > ranking, long time, TimeUnit unit)
    12 throws InterruptedException {
    13     List < QuoteTask > tasks = new ArrayList < QuoteTask > ();
    14     for (TravelCompany company: companies) tasks.add(new QuoteTask(company, travelInfo));
    16     List < Future < TravelQuote >> futures = exec.invokeAll(tasks, time, unit);
    18     List < TravelQuote > quotes = new ArrayList < TravelQuote > (tasks.size());
    20     Iterator < QuoteTask > taskIter = tasks.iterator();
    21     for (Future < TravelQuote > f: futures) {
    22         QuoteTask task = taskIter.next();
    23         try {
    24             quotes.add(f.get());
    25         } catch (ExecutionException e) {
    26             quotes.add(task.getFailureQuote(e.getCause()));
    27         } catch (CancellationException e) {
    28             quotes.add(task.getTimeoutQuote(e));
    29         }
    30     }
    31     Collections.sort(quotes, ranking);
    32     return quotes;
    33 }

     

Reference:

Java 8 API doc

Book JCIP

  


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 題意 "題目鏈接" Sol 首先維護出首碼xor和尾碼xor 對每個位置的元素插入到Trie樹裡面,每次找到和該首碼xor起來最大的元素 正反各做一遍,取最大。 記得要開log倍空間qwq。。 cpp include using namespace std; const int MAXN = 4e5 ...
  • 我,大三狗一枚。目前在武漢一所民辦高校上學,學的電氣工程專業。 雖然獲得過中國機器人大賽的一些獎項,但是任然對自己的前途感到渺茫, 經過一段時間的憂郁決定向程式員發展一波,本身對IT這方面比較感興趣,俗話說 :“興趣是最好的老師。”在閑暇之餘瞭解到Python,經過一番查找資料,瞭解到Python的 ...
  • Requests模塊 這個庫的標準文檔有個極其幽默的地方就是它的中文翻譯,我就截取個開頭部分,如下圖: 是不是很搞笑,在正文中還有許多,管中窺豹,可見一斑。通過我的使用,感覺Requests庫的確是給那些初學者,入門小白,非專業人士使用的,不會產生打人,砸鍵盤,脫髮等一系列反人類行為,很好的使社會安 ...
  • pandas的IO 量化投資逃不過數據處理,數據處理逃不過數據的讀取和存儲。一般,最常用的交易數據存儲格式是csv,但是csv有一個很大的缺點,就是無論如何,存儲起來都是一個文本的格式,例如日期‘2018-01-01’,在csv裡面是字元串格式存儲,每次read_csv的時候,我們如果希望日期以da ...
  • 1. variable 2. comments 1) # 2) '''''' """""" 3. input 4. string formatting expressions 1) '....%s...'%(values) 2) '...{}...' .format(values) 3) int( ...
  • copyfileobj方法 將類文件對象fsrc的內容複製到類文件對象fdst 我一般的用法: 註:如果fsrc對象的當前文件位置不為0,那麼將只複製從當前文件位置到文件末尾的內容 copyfile方法 將名為src的文件的內容複製到名為dst的文件中,並返回dst。src和dst是作為字元串給出的 ...
  • 主要是實現從相機進入預覽模式,然後刪除一張相片,並檢查刪除結果 ...
  • 你一定聽說過HashSet就是通過HashMap實現的 相信我,翻一翻HashSet的源碼,秒懂!! 其實很多東西,只是沒有靜下心來看,只要去看,說不定一下子就明白了…… HashSet 兩個屬性: 所有HashSet的操作都是通過對其屬性map操作實現的,map的key是HashSet,map的v ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...