JUC自定義線程池練習

来源:https://www.cnblogs.com/duizhangz/archive/2022/05/11/16259337.html
-Advertisement-
Play Games

JUC自定義線程池練習 首先上面該線程池的大致流程 自定義阻塞隊列 首先定義一個雙向的隊列和鎖一定兩個等待的condition 本類用lock來控制多線程下的流程執行 take和push方法就是死等,調用await就是等,後面優化為限時等待 take調用後取出阻塞隊列的task後會調用fullWai ...


JUC自定義線程池練習

image

首先上面該線程池的大致流程

自定義阻塞隊列

  • 首先定義一個雙向的隊列和鎖一定兩個等待的condition
  • 本類用lock來控制多線程下的流程執行
  • take和push方法就是死等,調用await就是等,後面優化為限時等待
  • take調用後取出阻塞隊列的task後會調用fullWaitSet的signal方法來喚醒因為阻塞隊列滿了的線程將task放入阻塞隊列。
@Slf4j
class TaskQueue<T> {

    // 雙向的阻塞隊列
    private Deque<T> deque;
    // 隊列最大容量
    private int capacity;

    // 鎖
    private ReentrantLock lock = new ReentrantLock();
    // 消費者任務池空的等待隊列
    private Condition emptyWaitSet = lock.newCondition();
    // 生產者任務池滿的等待隊列
    private Condition fullWaitSet = lock.newCondition();


    public TaskQueue(int capacity) {
        this.capacity = capacity;
        deque = new ArrayDeque<>(capacity);
    }

    // 死等take,即從阻塞隊列取出任務
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("取走任務");
            T task = deque.pollFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 線程添加任務,屬於是死等添加
    public void push(T task) {
        lock.lock();
        try {
            while (deque.size() >= capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

}

優化,死等優化為超時等

  • awaitNanos方法返回的是等待的剩餘時間,如果已經等了base時間就會返回0,如果沒有就會返回大於0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長。當然在本題的設計中不會出現虛假喚醒的情況。
public T poll(Long timeout,TimeUnit unit) {
    lock.lock();
    try {
        long base = unit.toNanos(timeout);
        while (deque.isEmpty()) {
            try {
                if (base <= 0){
                    return null;
                }
                base = emptyWaitSet.awaitNanos(base);  // 返回還剩下的時間
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("取走任務");
        T task = deque.pollFirst();
        fullWaitSet.signal();
        return task;
    } finally {
        lock.unlock();
    }
}

線程池類

  • 成員變數如下,對於Worker就工作線程
@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            // 就是線程把當前分配的任務做完,然後還要去阻塞隊列找活乾,沒活就退出
            // taks 如果不為空就執行然後講其置為空,後續再次進入迴圈後會從阻塞隊列中再次取出task,
            // 如果不為空就繼續執行,但是因為take死等,會導致無法結束
            // 使用了這個超時等的方法,當無法取出時就會退出程式
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 當沒有任務可執行,線程自動銷毀,由於這是根據對象來銷毀,且hashset無序,所以這裡無需保證其的線程安全。
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 需要synchronized關鍵字控制多線程下對執行方法的執行,保證共用變數workerSet安全。
        synchronized (workerSet) {
            // 如果已經存在的工作線程已經大於核心數,就不適合在進行創建線程了,創太多線程對於執行並不會加快,反而會因為線程不斷切換而拖累CPU的執行。
            if (workerSet.size() >= coreNum) {
                taskQueue.push(runnable);
            } else {
                // 如果工作線程小於核心數就可創建一個worker線程來工作
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

測試類

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

優化---拒絕策略

我們沒有進行優化的就是當任務太多導致阻塞線程也滿了,此時任務線程就會進行阻塞,直到等到有人線上程池中取走任務。也就是push方法,我們在舊的方法中仍採用的是死等的方法。

但是方法中有很多死等,超時等,放棄任務,拋出異常,讓調用者自己執行任務等等方法。

我們就可用講其進行抽象,把操作交給調用者。

定義瞭如下的函數式介面,即為拒絕策略。

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(TaskQueue<T> taskQueue,T task);
}

將在TaskQueue任務隊列中定義不同的策略,我們只要傳入這個函數式介面的實現對象就可用實現定製拒絕的策略。

在TaskQueue類添加一個方法,用來調用拒絕策略

public void tryAndAdd(T task,RejectPolicy rejectPolicy){
    lock.lock();
    try {
        if (deque.size() >= capacity) {
            rejectPolicy.reject(this,task);
        }else{
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        }
    } finally {
        lock.unlock();
    }
}

更改了構造方法的線程池類,這樣就可用傳入一個自定義的拒絕策略。

@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        synchronized (workerSet) {
            if (workerSet.size() >= coreNum) {
                taskQueue.tryAndAdd(runnable,rejectPolicy);
            } else {
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

將啟動類修改如下

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            // 採用死等的方法,當然我們可用在taskQueue中定義更多的方法讓調用者選擇
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

這樣我們就完成了自定義的線程池。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 至此基本上vue2.0的內容全部結束,後面還有點elementUI和vue3.0的內容過幾天再來更新。 這幾天要回學校去參加畢業答辯,斷更幾天 一.相關理解 是vue的一個插件庫,專門用來實現spa(單頁面應用)的,也就是一直都是一個index.html頁面,他有他的導航區和展示區,雖然只有一個頁面 ...
  • markRaw 作用:標記一個對象,使其永遠不會再成為響應式對象 應用場景: 1.有些值不應被設置成響應式時,例如複雜的第三方類庫等 2.當渲染具有不可變數據源的大列表時,跳過響應式轉換可以提高性能 3.在動態渲染組件的時候我們就可以使用 markRaw 包裹。 markRaw 的使用場景 很多時候 ...
  • 在 CSS 中,其實存在各種各樣的函數。具體分為: Transform functions Math functions Filter functions Color functions Image functions Counter functions Font functions Shape f ...
  • 1.“new”有什麼不對勁? 在我們沒有接觸到工廠模式(簡單工廠、工廠方法模式、抽象工廠模式)之前,我們實例化對象唯一的方法就是通過“new”關鍵字來完成。但是,大量的使用“new”關鍵字來實例化對象會違背一些設計原則,因為代碼與具體的類型綁在一起,從而導致過多的依賴於細節而非抽象,這樣代碼就很難適 ...
  • STL初步認識:介紹了【什麼是STL】,以及【STL庫里的常見容器】,也對【迭代器】進行了簡短的說明。 ...
  • 一、if判斷語句 if語句是用來進行判斷的,其使用格式如下: if 要判斷的條件: 條件成立時要做的事 Demo age = input('輸入年齡:') #input返回的是字元串類型 if int(age) > 18: #這裡對age做強制類型轉換 字元串和整數int不可比較 print('你成 ...
  • JDK自帶線程池 線程池的狀態 線程有如下狀態 RUNNING狀態:Accept new tasks and process queued tasks SHUTDOWN狀態:Don't accept new tasks, but process queued tasks STOP狀態: Don't ...
  • 跟著教程寫了幾種方法,才發現自己寫的雖然能實現,但比較繁瑣。教程有三種方法: 1.移位法,每次左移一位,相比我自己寫的,優點是不用把每一種情況都寫出來。但是需要考慮左移到最後一位時需要自己再寫個賦值語句重新回到第一位。 2.位拼接法,迴圈左移,每一次都把最後一位放到第一位,其他六位左移一位,剋服了移 ...
一周排行
    -Advertisement-
    Play Games
  • 1. 說明 /* Performs operations on System.String instances that contain file or directory path information. These operations are performed in a cross-pla ...
  • 視頻地址:【WebApi+Vue3從0到1搭建《許可權管理系統》系列視頻:搭建JWT系統鑒權-嗶哩嗶哩】 https://b23.tv/R6cOcDO qq群:801913255 一、在appsettings.json中設置鑒權屬性 /*jwt鑒權*/ "JwtSetting": { "Issuer" ...
  • 引言 集成測試可在包含應用支持基礎結構(如資料庫、文件系統和網路)的級別上確保應用組件功能正常。 ASP.NET Core 通過將單元測試框架與測試 Web 主機和記憶體中測試伺服器結合使用來支持集成測試。 簡介 集成測試與單元測試相比,能夠在更廣泛的級別上評估應用的組件,確認多個組件一起工作以生成預 ...
  • 在.NET Emit編程中,我們探討了運算操作指令的重要性和應用。這些指令包括各種數學運算、位操作和比較操作,能夠在動態生成的代碼中實現對數據的處理和操作。通過這些指令,開發人員可以靈活地進行算術運算、邏輯運算和比較操作,從而實現各種複雜的演算法和邏輯......本篇之後,將進入第七部分:實戰項目 ...
  • 前言 多表頭表格是一個常見的業務需求,然而WPF中卻沒有預設實現這個功能,得益於WPF強大的控制項模板設計,我們可以通過修改控制項模板的方式自己實現它。 一、需求分析 下圖為一個典型的統計表格,統計1-12月的數據。 此時我們有一個需求,需要將月份按季度劃分,以便能夠直觀地看到季度統計數據,以下為該需求 ...
  • 如何將 ASP.NET Core MVC 項目的視圖分離到另一個項目 在當下這個年代 SPA 已是主流,人們早已忘記了 MVC 以及 Razor 的故事。但是在某些場景下 SSR 還是有意想不到效果。比如某些靜態頁面,比如追求首屏載入速度的時候。最近在項目中回歸傳統效果還是不錯。 有的時候我們希望將 ...
  • System.AggregateException: 發生一個或多個錯誤。 > Microsoft.WebTools.Shared.Exceptions.WebToolsException: 生成失敗。檢查輸出視窗瞭解更多詳細信息。 內部異常堆棧跟蹤的結尾 > (內部異常 #0) Microsoft ...
  • 引言 在上一章節我們實戰了在Asp.Net Core中的項目實戰,這一章節講解一下如何測試Asp.Net Core的中間件。 TestServer 還記得我們在集成測試中提供的TestServer嗎? TestServer 是由 Microsoft.AspNetCore.TestHost 包提供的。 ...
  • 在發現結果為真的WHEN子句時,CASE表達式的真假值判斷會終止,剩餘的WHEN子句會被忽略: CASE WHEN col_1 IN ('a', 'b') THEN '第一' WHEN col_1 IN ('a') THEN '第二' ELSE '其他' END 註意: 統一各分支返回的數據類型. ...
  • 在C#編程世界中,語法的精妙之處往往體現在那些看似微小卻極具影響力的符號與結構之中。其中,“_ =” 這一組合突然出現還真不知道什麼意思。本文將深入剖析“_ =” 的含義、工作原理及其在實際編程中的廣泛應用,揭示其作為C#語法奇兵的重要角色。 一、下劃線 _:神秘的棄元符號 下劃線 _ 在C#中並非 ...