JUC自定義線程池練習

来源:https://www.cnblogs.com/duizhangz/archive/2022/05/11/16259337.html
-Advertisement-
Play Games

JUC自定義線程池練習 首先上面該線程池的大致流程 自定義阻塞隊列 首先定義一個雙向的隊列和鎖一定兩個等待的condition 本類用lock來控制多線程下的流程執行 take和push方法就是死等,調用await就是等,後面優化為限時等待 take調用後取出阻塞隊列的task後會調用fullWai ...


JUC自定義線程池練習

image

首先上面該線程池的大致流程

自定義阻塞隊列

  • 首先定義一個雙向的隊列和鎖一定兩個等待的condition
  • 本類用lock來控制多線程下的流程執行
  • take和push方法就是死等,調用await就是等,後面優化為限時等待
  • take調用後取出阻塞隊列的task後會調用fullWaitSet的signal方法來喚醒因為阻塞隊列滿了的線程將task放入阻塞隊列。
@Slf4j
class TaskQueue<T> {

    // 雙向的阻塞隊列
    private Deque<T> deque;
    // 隊列最大容量
    private int capacity;

    // 鎖
    private ReentrantLock lock = new ReentrantLock();
    // 消費者任務池空的等待隊列
    private Condition emptyWaitSet = lock.newCondition();
    // 生產者任務池滿的等待隊列
    private Condition fullWaitSet = lock.newCondition();


    public TaskQueue(int capacity) {
        this.capacity = capacity;
        deque = new ArrayDeque<>(capacity);
    }

    // 死等take,即從阻塞隊列取出任務
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("取走任務");
            T task = deque.pollFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 線程添加任務,屬於是死等添加
    public void push(T task) {
        lock.lock();
        try {
            while (deque.size() >= capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

}

優化,死等優化為超時等

  • awaitNanos方法返回的是等待的剩餘時間,如果已經等了base時間就會返回0,如果沒有就會返回大於0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長。當然在本題的設計中不會出現虛假喚醒的情況。
public T poll(Long timeout,TimeUnit unit) {
    lock.lock();
    try {
        long base = unit.toNanos(timeout);
        while (deque.isEmpty()) {
            try {
                if (base <= 0){
                    return null;
                }
                base = emptyWaitSet.awaitNanos(base);  // 返回還剩下的時間
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("取走任務");
        T task = deque.pollFirst();
        fullWaitSet.signal();
        return task;
    } finally {
        lock.unlock();
    }
}

線程池類

  • 成員變數如下,對於Worker就工作線程
@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            // 就是線程把當前分配的任務做完,然後還要去阻塞隊列找活乾,沒活就退出
            // taks 如果不為空就執行然後講其置為空,後續再次進入迴圈後會從阻塞隊列中再次取出task,
            // 如果不為空就繼續執行,但是因為take死等,會導致無法結束
            // 使用了這個超時等的方法,當無法取出時就會退出程式
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 當沒有任務可執行,線程自動銷毀,由於這是根據對象來銷毀,且hashset無序,所以這裡無需保證其的線程安全。
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 需要synchronized關鍵字控制多線程下對執行方法的執行,保證共用變數workerSet安全。
        synchronized (workerSet) {
            // 如果已經存在的工作線程已經大於核心數,就不適合在進行創建線程了,創太多線程對於執行並不會加快,反而會因為線程不斷切換而拖累CPU的執行。
            if (workerSet.size() >= coreNum) {
                taskQueue.push(runnable);
            } else {
                // 如果工作線程小於核心數就可創建一個worker線程來工作
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

測試類

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

優化---拒絕策略

我們沒有進行優化的就是當任務太多導致阻塞線程也滿了,此時任務線程就會進行阻塞,直到等到有人線上程池中取走任務。也就是push方法,我們在舊的方法中仍採用的是死等的方法。

但是方法中有很多死等,超時等,放棄任務,拋出異常,讓調用者自己執行任務等等方法。

我們就可用講其進行抽象,把操作交給調用者。

定義瞭如下的函數式介面,即為拒絕策略。

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(TaskQueue<T> taskQueue,T task);
}

將在TaskQueue任務隊列中定義不同的策略,我們只要傳入這個函數式介面的實現對象就可用實現定製拒絕的策略。

在TaskQueue類添加一個方法,用來調用拒絕策略

public void tryAndAdd(T task,RejectPolicy rejectPolicy){
    lock.lock();
    try {
        if (deque.size() >= capacity) {
            rejectPolicy.reject(this,task);
        }else{
            log.debug("添加任務");
            deque.offerLast(task);
            emptyWaitSet.signal();
        }
    } finally {
        lock.unlock();
    }
}

更改了構造方法的線程池類,這樣就可用傳入一個自定義的拒絕策略。

@Slf4j
class ThreadPool {
    // 阻塞隊列大小
    private int capacity;
    // 阻塞隊列
    private TaskQueue<Runnable> taskQueue;
    // 工作線程
    private HashSet<Worker> workerSet = new HashSet<>();
    // 核心數
    private int coreNum;
    // 超時等待時間
    private long timeout;
    // 超時等待單位
    private TimeUnit unit;
    // 拒絕策略
    private RejectPolicy rejectPolicy;

    // 線程對象
    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable runnable) {
            this.task = runnable;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
                try {
                    log.debug("開始執行任務");
                    Thread.sleep(1000);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            workerSet.remove(this);
        }
    }


    public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
        this.capacity = capacity;
        this.coreNum = coreNum;
        this.timeout = timeout;
        this.unit = unit;
        this.taskQueue = new TaskQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 當線程數大於核心數,就將任務放入阻塞隊列
     * 否則創建線程進行處理
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        synchronized (workerSet) {
            if (workerSet.size() >= coreNum) {
                taskQueue.tryAndAdd(runnable,rejectPolicy);
            } else {
                Worker worker = new Worker(runnable);
                workerSet.add(worker);
                worker.start();
            }
        }
    }
}

將啟動類修改如下

@Slf4j
public class MyThreadPool {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
            // 採用死等的方法,當然我們可用在taskQueue中定義更多的方法讓調用者選擇
            taskQueue.push(task);
        });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                log.debug("任務{}", j);
            });
        }
    }

}

這樣我們就完成了自定義的線程池。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 至此基本上vue2.0的內容全部結束,後面還有點elementUI和vue3.0的內容過幾天再來更新。 這幾天要回學校去參加畢業答辯,斷更幾天 一.相關理解 是vue的一個插件庫,專門用來實現spa(單頁面應用)的,也就是一直都是一個index.html頁面,他有他的導航區和展示區,雖然只有一個頁面 ...
  • markRaw 作用:標記一個對象,使其永遠不會再成為響應式對象 應用場景: 1.有些值不應被設置成響應式時,例如複雜的第三方類庫等 2.當渲染具有不可變數據源的大列表時,跳過響應式轉換可以提高性能 3.在動態渲染組件的時候我們就可以使用 markRaw 包裹。 markRaw 的使用場景 很多時候 ...
  • 在 CSS 中,其實存在各種各樣的函數。具體分為: Transform functions Math functions Filter functions Color functions Image functions Counter functions Font functions Shape f ...
  • 1.“new”有什麼不對勁? 在我們沒有接觸到工廠模式(簡單工廠、工廠方法模式、抽象工廠模式)之前,我們實例化對象唯一的方法就是通過“new”關鍵字來完成。但是,大量的使用“new”關鍵字來實例化對象會違背一些設計原則,因為代碼與具體的類型綁在一起,從而導致過多的依賴於細節而非抽象,這樣代碼就很難適 ...
  • STL初步認識:介紹了【什麼是STL】,以及【STL庫里的常見容器】,也對【迭代器】進行了簡短的說明。 ...
  • 一、if判斷語句 if語句是用來進行判斷的,其使用格式如下: if 要判斷的條件: 條件成立時要做的事 Demo age = input('輸入年齡:') #input返回的是字元串類型 if int(age) > 18: #這裡對age做強制類型轉換 字元串和整數int不可比較 print('你成 ...
  • JDK自帶線程池 線程池的狀態 線程有如下狀態 RUNNING狀態:Accept new tasks and process queued tasks SHUTDOWN狀態:Don't accept new tasks, but process queued tasks STOP狀態: Don't ...
  • 跟著教程寫了幾種方法,才發現自己寫的雖然能實現,但比較繁瑣。教程有三種方法: 1.移位法,每次左移一位,相比我自己寫的,優點是不用把每一種情況都寫出來。但是需要考慮左移到最後一位時需要自己再寫個賦值語句重新回到第一位。 2.位拼接法,迴圈左移,每一次都把最後一位放到第一位,其他六位左移一位,剋服了移 ...
一周排行
    -Advertisement-
    Play Games
  • 隨著Aspire發佈preview5的發佈,Microsoft.Extensions.ServiceDiscovery隨之更新, 服務註冊發現這個屬於老掉牙的話題解決什麼問題就不贅述了,這裡主要講講Microsoft.Extensions.ServiceDiscovery(preview5)以及如何 ...
  • 概述:通過使用`SemaphoreSlim`,可以簡單而有效地限制非同步HTTP請求的併發量,確保在任何給定時間內不超過20個網頁同時下載。`ParallelOptions`不適用於非同步操作,但可考慮使用`Parallel.ForEach`,儘管在非同步場景中謹慎使用。 對於併發非同步 I/O 操作的數量 ...
  • 1.Linux上安裝Docken 伺服器系統版本以及內核版本:cat /etc/redhat-release 查看伺服器內核版本:uname -r 安裝依賴包:yum install -y yum-utils device-mapper-persistent-data lvm2 設置阿裡雲鏡像源:y ...
  • 概述:WPF界面綁定和渲染大量數據可能導致性能問題。通過啟用UI虛擬化、非同步載入和數據分頁,可以有效提高界面響應性能。以下是簡單示例演示這些優化方法。 在WPF中,當你嘗試綁定和渲染大量的數據項時,性能問題可能出現。以下是一些可能導致性能慢的原因以及優化方法: UI 虛擬化: WPF提供了虛擬化技術 ...
  • 引言 上一章節介紹了 TDD 的三大法則,今天我們講一下在單元測試中模擬對象的使用。 Fake Fake - Fake 是一個通用術語,可用於描述 stub或 mock 對象。 它是 stub 還是 mock 取決於使用它的上下文。 也就是說,Fake 可以是 stub 或 mock Mock - ...
  • 為.net6在CentOS7上面做準備,先在vmware虛擬機安裝CentOS 7.9 新建CentOS764位的系統 因為CentOS8不更新了,所以安裝7;簡單就一筆帶過了 選擇下載好的操作系統的iso文件,下載地址https://mirrors.aliyun.com/centos/7.9.20 ...
  • 經過前面幾篇的學習,我們瞭解到指令的大概分類,如:參數載入指令,該載入指令以 Ld 開頭,將參數載入到棧中,以便於後續執行操作命令。參數存儲指令,其指令以 St 開頭,將棧中的數據,存儲到指定的變數中,以方便後續使用。創建實例指令,其指令以 New 開頭,用於在運行時動態生成並初始化對象。方法調用指... ...
  • LiteDB 是一個輕量級的嵌入式 NoSQL 資料庫,其設計理念與 MongoDB 類似,但它是完全使用 C# 開發的,因此與 C# 應用程式的集成非常順暢。與 SQLite 相比,LiteDB 提供了 NoSQL(即鍵值對)的數據存儲方式,並且是一個開源且免費的項目。它適用於桌面、移動以及 We ...
  • 1 開源解析和拆分文檔 第三方的工具去對文件解析拆分,去將我們的文件內容給提取出來,並將我們的文檔內容去拆分成一個小的chunk。常見的PDF word mark down, JSON、HTML。都可以有很好的一些模塊去把這些文件去進行一個東西去提取。 優勢 支持豐富的文檔類型 每種文檔多樣化選擇 ...
  • OOM是什麼?英文全稱為 OutOfMemoryError(記憶體溢出錯誤)。當程式發生OOM時,如何去定位導致異常的代碼還是挺麻煩的。 要檢查OOM發生的原因,首先需要瞭解各種OOM情況下會報的異常信息。這樣能縮小排查範圍,再結合異常堆棧、heapDump文件、JVM分析工具和業務代碼來判斷具體是哪 ...