JUC自定義線程池練習 首先上面該線程池的大致流程 自定義阻塞隊列 首先定義一個雙向的隊列和鎖一定兩個等待的condition 本類用lock來控制多線程下的流程執行 take和push方法就是死等,調用await就是等,後面優化為限時等待 take調用後取出阻塞隊列的task後會調用fullWai ...
JUC自定義線程池練習
首先上面該線程池的大致流程
自定義阻塞隊列
- 首先定義一個雙向的隊列和鎖一定兩個等待的condition
- 本類用lock來控制多線程下的流程執行
- take和push方法就是死等,調用await就是等,後面優化為限時等待
- take調用後取出阻塞隊列的task後會調用fullWaitSet的signal方法來喚醒因為阻塞隊列滿了的線程將task放入阻塞隊列。
@Slf4j
class TaskQueue<T> {
// 雙向的阻塞隊列
private Deque<T> deque;
// 隊列最大容量
private int capacity;
// 鎖
private ReentrantLock lock = new ReentrantLock();
// 消費者任務池空的等待隊列
private Condition emptyWaitSet = lock.newCondition();
// 生產者任務池滿的等待隊列
private Condition fullWaitSet = lock.newCondition();
public TaskQueue(int capacity) {
this.capacity = capacity;
deque = new ArrayDeque<>(capacity);
}
// 死等take,即從阻塞隊列取出任務
public T take() {
lock.lock();
try {
while (deque.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("取走任務");
T task = deque.pollFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
// 線程添加任務,屬於是死等添加
public void push(T task) {
lock.lock();
try {
while (deque.size() >= capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任務");
deque.offerLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int getSize() {
lock.lock();
try {
return deque.size();
}finally {
lock.unlock();
}
}
}
優化,死等優化為超時等
- awaitNanos方法返回的是等待的剩餘時間,如果已經等了base時間就會返回0,如果沒有就會返回大於0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長。當然在本題的設計中不會出現虛假喚醒的情況。
public T poll(Long timeout,TimeUnit unit) {
lock.lock();
try {
long base = unit.toNanos(timeout);
while (deque.isEmpty()) {
try {
if (base <= 0){
return null;
}
base = emptyWaitSet.awaitNanos(base); // 返回還剩下的時間
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("取走任務");
T task = deque.pollFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
線程池類
- 成員變數如下,對於Worker就工作線程
@Slf4j
class ThreadPool {
// 阻塞隊列大小
private int capacity;
// 阻塞隊列
private TaskQueue<Runnable> taskQueue;
// 工作線程
private HashSet<Worker> workerSet = new HashSet<>();
// 核心數
private int coreNum;
// 超時等待時間
private long timeout;
// 超時等待單位
private TimeUnit unit;
// 拒絕策略
private RejectPolicy rejectPolicy;
// 線程對象
class Worker extends Thread {
private Runnable task;
public Worker(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
// 就是線程把當前分配的任務做完,然後還要去阻塞隊列找活乾,沒活就退出
// taks 如果不為空就執行然後講其置為空,後續再次進入迴圈後會從阻塞隊列中再次取出task,
// 如果不為空就繼續執行,但是因為take死等,會導致無法結束
// 使用了這個超時等的方法,當無法取出時就會退出程式
while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
try {
log.debug("開始執行任務");
Thread.sleep(1000);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
// 當沒有任務可執行,線程自動銷毀,由於這是根據對象來銷毀,且hashset無序,所以這裡無需保證其的線程安全。
workerSet.remove(this);
}
}
public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
this.capacity = capacity;
this.coreNum = coreNum;
this.timeout = timeout;
this.unit = unit;
this.taskQueue = new TaskQueue<>(capacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 當線程數大於核心數,就將任務放入阻塞隊列
* 否則創建線程進行處理
*
* @param runnable
*/
public void execute(Runnable runnable) {
// 需要synchronized關鍵字控制多線程下對執行方法的執行,保證共用變數workerSet安全。
synchronized (workerSet) {
// 如果已經存在的工作線程已經大於核心數,就不適合在進行創建線程了,創太多線程對於執行並不會加快,反而會因為線程不斷切換而拖累CPU的執行。
if (workerSet.size() >= coreNum) {
taskQueue.push(runnable);
} else {
// 如果工作線程小於核心數就可創建一個worker線程來工作
Worker worker = new Worker(runnable);
workerSet.add(worker);
worker.start();
}
}
}
}
測試類
@Slf4j
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
taskQueue.push(task);
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("任務{}", j);
});
}
}
}
優化---拒絕策略
我們沒有進行優化的就是當任務太多導致阻塞線程也滿了,此時任務線程就會進行阻塞,直到等到有人線上程池中取走任務。也就是push方法,我們在舊的方法中仍採用的是死等的方法。
但是方法中有很多死等,超時等,放棄任務,拋出異常,讓調用者自己執行任務等等方法。
我們就可用講其進行抽象,把操作交給調用者。
定義瞭如下的函數式介面,即為拒絕策略。
@FunctionalInterface
interface RejectPolicy<T>{
void reject(TaskQueue<T> taskQueue,T task);
}
將在TaskQueue任務隊列中定義不同的策略,我們只要傳入這個函數式介面的實現對象就可用實現定製拒絕的策略。
在TaskQueue類添加一個方法,用來調用拒絕策略
public void tryAndAdd(T task,RejectPolicy rejectPolicy){
lock.lock();
try {
if (deque.size() >= capacity) {
rejectPolicy.reject(this,task);
}else{
log.debug("添加任務");
deque.offerLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
更改了構造方法的線程池類,這樣就可用傳入一個自定義的拒絕策略。
@Slf4j
class ThreadPool {
// 阻塞隊列大小
private int capacity;
// 阻塞隊列
private TaskQueue<Runnable> taskQueue;
// 工作線程
private HashSet<Worker> workerSet = new HashSet<>();
// 核心數
private int coreNum;
// 超時等待時間
private long timeout;
// 超時等待單位
private TimeUnit unit;
// 拒絕策略
private RejectPolicy rejectPolicy;
// 線程對象
class Worker extends Thread {
private Runnable task;
public Worker(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
try {
log.debug("開始執行任務");
Thread.sleep(1000);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
workerSet.remove(this);
}
}
public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
this.capacity = capacity;
this.coreNum = coreNum;
this.timeout = timeout;
this.unit = unit;
this.taskQueue = new TaskQueue<>(capacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 當線程數大於核心數,就將任務放入阻塞隊列
* 否則創建線程進行處理
*
* @param runnable
*/
public void execute(Runnable runnable) {
synchronized (workerSet) {
if (workerSet.size() >= coreNum) {
taskQueue.tryAndAdd(runnable,rejectPolicy);
} else {
Worker worker = new Worker(runnable);
workerSet.add(worker);
worker.start();
}
}
}
}
將啟動類修改如下
@Slf4j
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
// 採用死等的方法,當然我們可用在taskQueue中定義更多的方法讓調用者選擇
taskQueue.push(task);
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("任務{}", j);
});
}
}
}
這樣我們就完成了自定義的線程池。