JUC學習-線程池部分

来源:https://www.cnblogs.com/appletree24/archive/2023/02/21/17142610.html
-Advertisement-
Play Games

自定義線程池 package com.appletree24; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.ExecutionEx ...


自定義線程池

package com.appletree24;  
  
import java.util.ArrayDeque;  
import java.util.Deque;  
import java.util.HashSet;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> {  
            //帶超時等待  
//            queue.offer(task,500,TimeUnit.MILLISECONDS);  
        });  
        for (int i = 0; i < 10; i++) {  
            int j = i;  
            threadPool.execute(() -> {  
                try {  
                    Thread.sleep(1000L);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(j);  
            });  
        }  
    }  
}  
  
//策略模式介面 此處使用策略模式是因為在實現拒絕策略時,有許多種拒絕的方式,這些方式如果不使用恰當的模式,就需要大量的if..else來編寫  
//且方式數量大於4個,會造成類膨脹的問題,推薦使用混合模式  
//https://www.runoob.com/design-pattern/strategy-pattern.html  
@FunctionalInterface  
interface RejectPolicy<T> {  
    void reject(BlockingQueue<T> queue, T task);  
}  
  
class ThreadPool {  
    //任務隊列  
    private BlockingQueue<Runnable> taskQueue;  
    //線程集合  
    private HashSet<Worker> workers = new HashSet<>();  
  
    //線程數  
    private int coreSize;  
  
    //超時時間  
    private long timeout;  
  
    private TimeUnit timeUnit;  
    private RejectPolicy<Runnable> rejectPolicy;  
  
    //執行任務  
    public void execute(Runnable task) {  
        //當任務數未超過核心線程數時,直接交給Worker對象執行  
        //如果超過,則加入阻塞任務隊列,暫存起來  
        synchronized (workers) {  
            if (workers.size() < coreSize) {  
                Worker worker = new Worker(task);  
                workers.add(worker);  
                worker.start();  
            } else {  
                //第一種選擇死等  
//                taskQueue.put(task);  
                //第二種為超時等待  
                //第三種為消費者放棄任務執行  
                //第四種為主線程拋出異常  
                //第五種讓調用者自行執行任務  
                taskQueue.tryPut(rejectPolicy, task);  
            }  
        }  
    }  
  
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {  
        this.coreSize = coreSize;  
        this.timeout = timeout;  
        this.timeUnit = timeUnit;  
        this.taskQueue = new BlockingQueue<>(queueCapcity);  
        this.rejectPolicy = rejectPolicy;  
    }  
  
    class Worker extends Thread {  
        private Runnable task;  
  
        public Worker(Runnable task) {  
            this.task = task;  
        }  
  
        @Override  
        public void run() {  
            //執行任務  
            //1.當傳遞過來的task不為空,執行任務  
            //2.當task執行完畢,再接著取下一個任務並執行  
            while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {  
                try {  
                    task.run();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                } finally {  
                    task = null;  
                }  
            }  
            synchronized (workers) {  
                workers.remove(this);  
            }  
        }  
    }  
}  
  
class BlockingQueue<T> {  
    //1. 任務隊列  
    private final Deque<T> queue = new ArrayDeque<>();  
  
    //2. 鎖  
    private final ReentrantLock lock = new ReentrantLock();  
  
    //3. 生產者條件變數  
    private final Condition fullWaitSet = lock.newCondition();  
  
    //4. 消費者條件變數  
    private final Condition emptyWaitSet = lock.newCondition();  
  
    //5. 容量上限  
    private int capcity;  
  
    public BlockingQueue(int capcity) {  
        this.capcity = capcity;  
    }  
  
    //帶超時的等待獲取  
    public T poll(long timeout, TimeUnit unit) {  
        lock.lock();  
        long nanos = unit.toNanos(timeout);  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    if (nanos <= 0) {  
                        return null;  
                    }  
                    nanos = emptyWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //消費者拿取任務的方法  
    public T take() {  
        lock.lock();  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    emptyWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //阻塞添加  
    public void put(T task) {  
        lock.lock();  
        try {  
            while (queue.size() == capcity) {  
                try {  
                    fullWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完後喚醒消費者等待  
            emptyWaitSet.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //帶超時時間的阻塞添加  
    public boolean offer(T task, long timeout, TimeUnit unit) {  
        lock.lock();  
        try {  
            long nanos = unit.toNanos(timeout);  
            while (queue.size() == capcity) {  
                try {  
                    if (nanos <= 0) return false;  
                    nanos = fullWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完後喚醒消費者等待  
            emptyWaitSet.signal();  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
  
    //獲取當前阻塞隊列大小  
    public int size() {  
        lock.lock();  
        try {  
            return queue.size();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {  
        lock.lock();  
        try {  
            //判斷隊列是否已滿  
            if (queue.size() == capcity) {  
                rejectPolicy.reject(this, task);  
            } else {  
                queue.addLast(task);  
                emptyWaitSet.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}

上述的自定義線程池雖然能夠執行完畢主線程給予的任務,但任務全部執行結束後,開闢的線程池內核心線程仍然在運行,並沒有結束,這是因為目前線程池中的take方法仍然為不會有超時等待的take方法,造成了死等,需要為其加入超時停止的功能。也就是替代take()的poll()

JDK自帶線程池

介紹

image.png
ThreadPoolExecutor使用int的高三位表示線程池狀態,低29位表示線程數量
image.png
image.png
在ThreadPoolExecutor中,同樣也存在拒絕策略。其圖結構如下:
image.png
其中介面就對應著在自定義線程池中實現的策略模式介面,下麵的四個實現類就對應著四種不同的拒絕方式:
image.png

利用工具類創建固定大小線程池

image.png

利用工具類創建帶緩衝的線程池

image.png
從源碼可以看出,帶緩衝的線程池中緩衝隊列的使用的是一個名為SynchronousQueue的隊列,這個隊列的特點如下:隊列不具有容量,當沒有線程來取時,是無法對其內部放入數據的,例如隊列內部已有一個數字1,但此時沒有線程取走,則線此隊列目前並不能繼續存入數據,直到1被取走

利用工具類創建單線程線程池

image.png
從源碼可以看出,單線程線程池中核心線程數與最大線程數相等,即不存在應急線程。只能解決一個任務
那麼這個線程池和我自己創建一個線程的線程池有什麼區別呢?區別如下:
image.png

ThreadPoolExecutor-submit method

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    Future<String> result = pool.submit(() -> {  
        System.out.println("running");  
        Thread.sleep(1000);  
        return "ok";  
    });  
    System.out.println(result.get());  
}

submit方法可以傳入Runnable和Callable類型的參數,並且將線程內部所執行任務的結果返回,用Future包裝

ThreadPoolExecutor-invokeAll

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    results.forEach(f -> {  
        try {  
            System.out.printf(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

invokeAll方法可以傳入任務的集合,同樣的任務的返回值也會以列表形式返回

ThreadPoolExecutor-invokeAny

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    String result = pool.invokeAny(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.awaitTermination(1000, TimeUnit.MILLISECONDS);  
    System.out.println(result);  
}

invokeAny方法同樣可以傳入任務的集合,只不過最後返回的結果並不是任務的結果集合,而是最早完成的那個任務的結果。

ThreadPoolExecutor-shutdown

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.shutdown();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdown方法會將線程池的狀態變為SHUTDOWN

  • 不會接受新任務
  • 但已提交的任務會執行完
  • 此方法不會阻塞調用線程的執行

ThreadPoolExecutor-shutdownNow

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    List<Runnable> runnables = pool.shutdownNow();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdownNow方法會將線程池狀態變為STOP

  • 不會接受新任務
  • 會將隊列中現有的任務返回
  • 並且用interrupt方法中斷正在執行的任務

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文是系列第二篇。系列文章: 現代圖片性能優化及體驗優化指南 - 圖片類型及 Picture 標簽的使用 圖片資源,在我們的業務中可謂是占據了非常大頭的一環,尤其是其對帶寬的消耗是十分巨大的。 對圖片的性能優化及體驗優化在今天就顯得尤為重要。本文,就將從各個方面闡述,在各種新特性滿頭飛的今天,我們可 ...
  • 一、背景 遠程服務將電腦程式的工作範圍從單機擴展到網路,從本地延伸至遠程,是構建分散式系統的首要基礎。遠程服務調用(Remote Procedure Call,RPC)在電腦科學中已經存在了超過四十年時間。但很多人無法明確區分RPC與Rest。本文就講一講RPC和Rest的本質區別。 二、分析 ...
  • 談到java中的併發,我們就避不開線程之間的同步和協作問題,談到線程同步和協作我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的隊列同步器)機制; (一)、AQS中的state和Node含義: AQS中提供了一個int volatile state ...
  • RabbitMQ 配置環境 安裝 erlang環境以及RabbitMQ RabbitMQ埠號: 5672 去官網下載 https://www.rabbitmq.com 然後重啟RabbitMQ服務 RabbitMQ安裝教程 開放埠15672 這裡,通過http://IP地址:15672 進行We ...
  • 在如 rails 這樣的開源庫中,我們常常見到這樣的一類寫法: class_eval <<-RUBY, lxx_file, lxx_line + 1 def xxx # do something here end RUBY 令人困惑不已。 不過這裡的知識點非常簡單,只要掌握了 heredoc 與 e ...
  • 前言 文章主要說明在FA中的中文函數的代碼實現,不僅要知道用法,更要知其實現的原理。前面的用法為FA中的用法,僅支持在FA中使用,源碼可以在其它app中使用。 非原創,代碼收集整理於網路。 進入子頁面 進入子頁面("頁面名稱") lua源碼: function 進入子頁面(name,param) i ...
  • 0、引言 我們在嵌入式開發的過程中,經常可以碰到在一些巨集定義或者是代碼段中使用了do {...} while(0)的語句,從語義上理解,do {...} while(0)內的邏輯就只執行一次,並沒有迴圈執行,粗略看來,似乎畫蛇添足了,那麼為什麼還需要在只執行一次的邏輯外面加上一層do {...} w ...
  • 隊列的概念 在說隊列之前,先回憶一下棧是什麼,我們一般說棧是一個先進後出的數據結構,而隊列就是先進先出的數據結構。 隊列是定在表的一端進行插入,表的另一端進行刪除。 通常,我們稱進數據的一端為隊尾,出數據的一端為隊首(這邊需要註意,經常會記反起碼我是這樣的),數據元素進隊列的過程稱為入隊,出隊列的過 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...