JUC自定義線程池練習

来源:https://www.cnblogs.com/duizhangz/archive/2022/05/11/16259337.html
-Advertisement-
Play Games

JUC自定義線程池練習 首先上面該線程池的大致流程 自定義阻塞隊列 首先定義一個雙向的隊列和鎖一定兩個等待的condition 本類用lock來控制多線程下的流程執行 take和push方法就是死等,調用await就是等,後面優化為限時等待 take調用後取出阻塞隊列的task後會調用fullWai ...


JUC自定義線程池練習

image

首先上面該線程池的大致流程

自定義阻塞隊列

  • 首先定義一個雙向的隊列和鎖一定兩個等待的condition
  • 本類用lock來控制多線程下的流程執行
  • take和push方法就是死等,調用await就是等,後面優化為限時等待
  • take調用後取出阻塞隊列的task後會調用fullWaitSet的signal方法來喚醒因為阻塞隊列滿了的線程將task放入阻塞隊列。
@Slf4j
class TaskQueue<T> {

    // 雙向的阻塞隊列
    private Deque<T> deque;
    // 隊列最大容量
    private int capacity;

    // 鎖
    private ReentrantLock lock = new ReentrantLock();
    // 消費者任務池空的等待隊列
    private Condition emptyWaitSet = lock.newCondition();
    // 生產者任務池滿的等待隊列
    private Condition fullWaitSet = lock.newCondition();


    public TaskQueue(int capacity) {
        this.capacity = capacity;
        deque = new ArrayDeque<>(capacity);
    }

    // 死等take,即從阻塞隊列取出任務
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("取走任務");
            T task = deque.pollFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 線程添加任務,屬於是死等添加
    public void push(T task) {
        lock.lock();
        try {
            while (deque.size() >= capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

}

優化,死等優化為超時等

  • awaitNanos方法返回的是等待的剩餘時間,如果已經等了base時間就會返回0,如果沒有就會返回大於0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長。當然在本題的設計中不會出現虛假喚醒的情況。
public T poll(Long timeout,TimeUnit unit) {
    lock.lock();
    try {
        long base = unit.toNanos(timeout);
        while (deque.isEmpty()) {
            try {
                if (base <= 0){
                    return null;
                }
                base = emptyWaitSet.awaitNanos(base);  // 返回還剩下的時間
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("取走任務");
        T task = deque.pollFirst();
        fullWaitSet.signal();
        return task;
    } finally {
        lock.unlock();
    }
}

線程池類

  • 成員變數如下,對於Worker就工作線程
@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            // 就是線程把當前分配的任務做完,然後還要去阻塞隊列找活乾,沒活就退出
            // taks 如果不為空就執行然後講其置為空,後續再次進入迴圈後會從阻塞隊列中再次取出task,
            // 如果不為空就繼續執行,但是因為take死等,會導致無法結束
            // 使用了這個超時等的方法,當無法取出時就會退出程式
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 當沒有任務可執行,線程自動銷毀,由於這是根據對象來銷毀,且hashset無序,所以這裡無需保證其的線程安全。
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 需要synchronized關鍵字控制多線程下對執行方法的執行,保證共用變數workerSet安全。
        synchronized (workerSet) {
            // 如果已經存在的工作線程已經大於核心數,就不適合在進行創建線程了,創太多線程對於執行並不會加快,反而會因為線程不斷切換而拖累CPU的執行。
            if (workerSet.size() >= coreNum) {
                taskQueue.push(runnable);
            } else {
                // 如果工作線程小於核心數就可創建一個worker線程來工作
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

測試類

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

優化---拒絕策略

我們沒有進行優化的就是當任務太多導致阻塞線程也滿了,此時任務線程就會進行阻塞,直到等到有人線上程池中取走任務。也就是push方法,我們在舊的方法中仍採用的是死等的方法。

但是方法中有很多死等,超時等,放棄任務,拋出異常,讓調用者自己執行任務等等方法。

我們就可用講其進行抽象,把操作交給調用者。

定義瞭如下的函數式介面,即為拒絕策略。

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(TaskQueue<T> taskQueue,T task);
}

將在TaskQueue任務隊列中定義不同的策略,我們只要傳入這個函數式介面的實現對象就可用實現定製拒絕的策略。

在TaskQueue類添加一個方法,用來調用拒絕策略

public void tryAndAdd(T task,RejectPolicy rejectPolicy){
    lock.lock();
    try {
        if (deque.size() >= capacity) {
            rejectPolicy.reject(this,task);
        }else{
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        }
    } finally {
        lock.unlock();
    }
}

更改了構造方法的線程池類,這樣就可用傳入一個自定義的拒絕策略。

@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        synchronized (workerSet) {
            if (workerSet.size() >= coreNum) {
                taskQueue.tryAndAdd(runnable,rejectPolicy);
            } else {
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

將啟動類修改如下

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            // 採用死等的方法,當然我們可用在taskQueue中定義更多的方法讓調用者選擇
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

這樣我們就完成了自定義的線程池。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 至此基本上vue2.0的內容全部結束,後面還有點elementUI和vue3.0的內容過幾天再來更新。 這幾天要回學校去參加畢業答辯,斷更幾天 一.相關理解 是vue的一個插件庫,專門用來實現spa(單頁面應用)的,也就是一直都是一個index.html頁面,他有他的導航區和展示區,雖然只有一個頁面 ...
  • markRaw 作用:標記一個對象,使其永遠不會再成為響應式對象 應用場景: 1.有些值不應被設置成響應式時,例如複雜的第三方類庫等 2.當渲染具有不可變數據源的大列表時,跳過響應式轉換可以提高性能 3.在動態渲染組件的時候我們就可以使用 markRaw 包裹。 markRaw 的使用場景 很多時候 ...
  • 在 CSS 中,其實存在各種各樣的函數。具體分為: Transform functions Math functions Filter functions Color functions Image functions Counter functions Font functions Shape f ...
  • 1.“new”有什麼不對勁? 在我們沒有接觸到工廠模式(簡單工廠、工廠方法模式、抽象工廠模式)之前,我們實例化對象唯一的方法就是通過“new”關鍵字來完成。但是,大量的使用“new”關鍵字來實例化對象會違背一些設計原則,因為代碼與具體的類型綁在一起,從而導致過多的依賴於細節而非抽象,這樣代碼就很難適 ...
  • STL初步認識:介紹了【什麼是STL】,以及【STL庫里的常見容器】,也對【迭代器】進行了簡短的說明。 ...
  • 一、if判斷語句 if語句是用來進行判斷的,其使用格式如下: if 要判斷的條件: 條件成立時要做的事 Demo age = input('輸入年齡:') #input返回的是字元串類型 if int(age) > 18: #這裡對age做強制類型轉換 字元串和整數int不可比較 print('你成 ...
  • JDK自帶線程池 線程池的狀態 線程有如下狀態 RUNNING狀態:Accept new tasks and process queued tasks SHUTDOWN狀態:Don't accept new tasks, but process queued tasks STOP狀態: Don't ...
  • 跟著教程寫了幾種方法,才發現自己寫的雖然能實現,但比較繁瑣。教程有三種方法: 1.移位法,每次左移一位,相比我自己寫的,優點是不用把每一種情況都寫出來。但是需要考慮左移到最後一位時需要自己再寫個賦值語句重新回到第一位。 2.位拼接法,迴圈左移,每一次都把最後一位放到第一位,其他六位左移一位,剋服了移 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...