JUC自定義線程池練習

来源:https://www.cnblogs.com/duizhangz/archive/2022/05/11/16259337.html
-Advertisement-
Play Games

JUC自定義線程池練習 首先上面該線程池的大致流程 自定義阻塞隊列 首先定義一個雙向的隊列和鎖一定兩個等待的condition 本類用lock來控制多線程下的流程執行 take和push方法就是死等,調用await就是等,後面優化為限時等待 take調用後取出阻塞隊列的task後會調用fullWai ...


JUC自定義線程池練習

image

首先上面該線程池的大致流程

自定義阻塞隊列

  • 首先定義一個雙向的隊列和鎖一定兩個等待的condition
  • 本類用lock來控制多線程下的流程執行
  • take和push方法就是死等,調用await就是等,後面優化為限時等待
  • take調用後取出阻塞隊列的task後會調用fullWaitSet的signal方法來喚醒因為阻塞隊列滿了的線程將task放入阻塞隊列。
@Slf4j
class TaskQueue<T> {

    // 雙向的阻塞隊列
    private Deque<T> deque;
    // 隊列最大容量
    private int capacity;

    // 鎖
    private ReentrantLock lock = new ReentrantLock();
    // 消費者任務池空的等待隊列
    private Condition emptyWaitSet = lock.newCondition();
    // 生產者任務池滿的等待隊列
    private Condition fullWaitSet = lock.newCondition();


    public TaskQueue(int capacity) {
        this.capacity = capacity;
        deque = new ArrayDeque<>(capacity);
    }

    // 死等take,即從阻塞隊列取出任務
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("取走任務");
            T task = deque.pollFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 線程添加任務,屬於是死等添加
    public void push(T task) {
        lock.lock();
        try {
            while (deque.size() >= capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

}

優化,死等優化為超時等

  • awaitNanos方法返回的是等待的剩餘時間,如果已經等了base時間就會返回0,如果沒有就會返回大於0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長。當然在本題的設計中不會出現虛假喚醒的情況。
public T poll(Long timeout,TimeUnit unit) {
    lock.lock();
    try {
        long base = unit.toNanos(timeout);
        while (deque.isEmpty()) {
            try {
                if (base <= 0){
                    return null;
                }
                base = emptyWaitSet.awaitNanos(base);  // 返回還剩下的時間
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("取走任務");
        T task = deque.pollFirst();
        fullWaitSet.signal();
        return task;
    } finally {
        lock.unlock();
    }
}

線程池類

  • 成員變數如下,對於Worker就工作線程
@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            // 就是線程把當前分配的任務做完,然後還要去阻塞隊列找活乾,沒活就退出
            // taks 如果不為空就執行然後講其置為空,後續再次進入迴圈後會從阻塞隊列中再次取出task,
            // 如果不為空就繼續執行,但是因為take死等,會導致無法結束
            // 使用了這個超時等的方法,當無法取出時就會退出程式
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 當沒有任務可執行,線程自動銷毀,由於這是根據對象來銷毀,且hashset無序,所以這裡無需保證其的線程安全。
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 需要synchronized關鍵字控制多線程下對執行方法的執行,保證共用變數workerSet安全。
        synchronized (workerSet) {
            // 如果已經存在的工作線程已經大於核心數,就不適合在進行創建線程了,創太多線程對於執行並不會加快,反而會因為線程不斷切換而拖累CPU的執行。
            if (workerSet.size() >= coreNum) {
                taskQueue.push(runnable);
            } else {
                // 如果工作線程小於核心數就可創建一個worker線程來工作
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

測試類

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

優化---拒絕策略

我們沒有進行優化的就是當任務太多導致阻塞線程也滿了,此時任務線程就會進行阻塞,直到等到有人線上程池中取走任務。也就是push方法,我們在舊的方法中仍採用的是死等的方法。

但是方法中有很多死等,超時等,放棄任務,拋出異常,讓調用者自己執行任務等等方法。

我們就可用講其進行抽象,把操作交給調用者。

定義瞭如下的函數式介面,即為拒絕策略。

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(TaskQueue<T> taskQueue,T task);
}

將在TaskQueue任務隊列中定義不同的策略,我們只要傳入這個函數式介面的實現對象就可用實現定製拒絕的策略。

在TaskQueue類添加一個方法,用來調用拒絕策略

public void tryAndAdd(T task,RejectPolicy rejectPolicy){
    lock.lock();
    try {
        if (deque.size() >= capacity) {
            rejectPolicy.reject(this,task);
        }else{
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        }
    } finally {
        lock.unlock();
    }
}

更改了構造方法的線程池類,這樣就可用傳入一個自定義的拒絕策略。

@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        synchronized (workerSet) {
            if (workerSet.size() >= coreNum) {
                taskQueue.tryAndAdd(runnable,rejectPolicy);
            } else {
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

將啟動類修改如下

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            // 採用死等的方法,當然我們可用在taskQueue中定義更多的方法讓調用者選擇
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

這樣我們就完成了自定義的線程池。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 至此基本上vue2.0的內容全部結束,後面還有點elementUI和vue3.0的內容過幾天再來更新。 這幾天要回學校去參加畢業答辯,斷更幾天 一.相關理解 是vue的一個插件庫,專門用來實現spa(單頁面應用)的,也就是一直都是一個index.html頁面,他有他的導航區和展示區,雖然只有一個頁面 ...
  • markRaw 作用:標記一個對象,使其永遠不會再成為響應式對象 應用場景: 1.有些值不應被設置成響應式時,例如複雜的第三方類庫等 2.當渲染具有不可變數據源的大列表時,跳過響應式轉換可以提高性能 3.在動態渲染組件的時候我們就可以使用 markRaw 包裹。 markRaw 的使用場景 很多時候 ...
  • 在 CSS 中,其實存在各種各樣的函數。具體分為: Transform functions Math functions Filter functions Color functions Image functions Counter functions Font functions Shape f ...
  • 1.“new”有什麼不對勁? 在我們沒有接觸到工廠模式(簡單工廠、工廠方法模式、抽象工廠模式)之前,我們實例化對象唯一的方法就是通過“new”關鍵字來完成。但是,大量的使用“new”關鍵字來實例化對象會違背一些設計原則,因為代碼與具體的類型綁在一起,從而導致過多的依賴於細節而非抽象,這樣代碼就很難適 ...
  • STL初步認識:介紹了【什麼是STL】,以及【STL庫里的常見容器】,也對【迭代器】進行了簡短的說明。 ...
  • 一、if判斷語句 if語句是用來進行判斷的,其使用格式如下: if 要判斷的條件: 條件成立時要做的事 Demo age = input('輸入年齡:') #input返回的是字元串類型 if int(age) > 18: #這裡對age做強制類型轉換 字元串和整數int不可比較 print('你成 ...
  • JDK自帶線程池 線程池的狀態 線程有如下狀態 RUNNING狀態:Accept new tasks and process queued tasks SHUTDOWN狀態:Don't accept new tasks, but process queued tasks STOP狀態: Don't ...
  • 跟著教程寫了幾種方法,才發現自己寫的雖然能實現,但比較繁瑣。教程有三種方法: 1.移位法,每次左移一位,相比我自己寫的,優點是不用把每一種情況都寫出來。但是需要考慮左移到最後一位時需要自己再寫個賦值語句重新回到第一位。 2.位拼接法,迴圈左移,每一次都把最後一位放到第一位,其他六位左移一位,剋服了移 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 推薦一款基於.NET 8、WPF、Prism.DryIoc、MVVM設計模式、Blazor以及MySQL資料庫構建的企業級工作流系統的WPF客戶端框架-AIStudio.Wpf.AClient 6.0。 項目介紹 框架採用了 Prism 框架來實現 MVVM 模式,不僅簡化了 MVVM 的典型 ...
  • 先看一下效果吧: 我們直接通過改造一下原版的TreeView來實現上面這個效果 我們先創建一個普通的TreeView 代碼很簡單: <TreeView> <TreeViewItem Header="人事部"/> <TreeViewItem Header="技術部"> <TreeViewItem He ...
  • 1. 生成式 AI 簡介 https://imp.i384100.net/LXYmq3 2. Python 語言 https://imp.i384100.net/5gmXXo 3. 統計和 R https://youtu.be/ANMuuq502rE?si=hw9GT6JVzMhRvBbF 4. 數 ...
  • 本文為大家介紹下.NET解壓/壓縮zip文件。雖然解壓縮不是啥核心技術,但壓縮性能以及進度處理還是需要關註下,針對使用較多的zip開源組件驗證,給大家提供個技術選型參考 之前在《.NET WebSocket高併發通信阻塞問題 - 唐宋元明清2188 - 博客園 (cnblogs.com)》講過,團隊 ...
  • 之前寫過兩篇關於Roslyn源生成器生成源代碼的用例,今天使用Roslyn的代碼修複器CodeFixProvider實現一個cs文件頭部註釋的功能, 代碼修複器會同時涉及到CodeFixProvider和DiagnosticAnalyzer, 實現FileHeaderAnalyzer 首先我們知道修 ...
  • 在軟體行業,經常會聽到一句話“文不如表,表不如圖”說明瞭圖形在軟體應用中的重要性。同樣在WPF開發中,為了程式美觀或者業務需要,經常會用到各種個樣的圖形。今天以一些簡單的小例子,簡述WPF開發中幾何圖形(Geometry)相關內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 在 C# 中使用 RabbitMQ 通過簡訊發送重置後的密碼到用戶的手機號上,你可以按照以下步驟進行 1.安裝 RabbitMQ 客戶端庫 首先,確保你已經安裝了 RabbitMQ 客戶端庫。你可以通過 NuGet 包管理器來安裝: dotnet add package RabbitMQ.Clien ...
  • 1.下載 Protocol Buffers 編譯器(protoc) 前往 Protocol Buffers GitHub Releases 頁面。在 "Assets" 下找到適合您系統的壓縮文件,通常為 protoc-{version}-win32.zip 或 protoc-{version}-wi ...
  • 簡介 在現代微服務架構中,服務發現(Service Discovery)是一項關鍵功能。它允許微服務動態地找到彼此,而無需依賴硬編碼的地址。以前如果你搜 .NET Service Discovery,大概率會搜到一大堆 Eureka,Consul 等的文章。現在微軟為我們帶來了一個官方的包:Micr ...
  • ZY樹洞 前言 ZY樹洞是一個基於.NET Core開發的簡單的評論系統,主要用於大家分享自己心中的感悟、經驗、心得、想法等。 好了,不賣關子了,這個項目其實是上班無聊的時候寫的,為什麼要寫這個項目呢?因為我單純的想吐槽一下工作中的不滿而已。 項目介紹 項目很簡單,主要功能就是提供一個簡單的評論系統 ...