自己實現一個簡單的線程池 public interface ThreadPool { // 啟動 void execute(Job job); // 關閉 void shutDown(); // 增加線程 void addWorkThread(int num); // 減少線程 void reduc ...
自己實現一個簡單的線程池
public interface ThreadPool<Job extends Runnable> {
// 啟動
void execute(Job job);
// 關閉
void shutDown();
// 增加線程
void addWorkThread(int num);
// 減少線程
void reduceWorkThread(int num) throws Exception;
// 正在執行的線程數
int getSize();
}
實現類
public class MyThreadPoll<Job extends Runnable> implements ThreadPool<Job> {
/**
* 最大線程數
*/
private static final int MAX_WORK_THREAD = 10;
/**
* 最小線程數
*/
private static final int MIN_WORK_THREAD = 1;
/**
* 預設的線程數
*/
private static final int DEFAULT_WORK_THREAD = 5;
/**
* 工作列表(無序)
*/
private final LinkedList<Job> jobQueue = new LinkedList<>();
/**
* 工作者線程
*/
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
/**
* 工作線程數
*/
private int workerNum = DEFAULT_WORK_THREAD;
/**
* 線程編號
*/
private AtomicLong threadNum = new AtomicLong();
public MyThreadPoll() {
initWorks(DEFAULT_WORK_THREAD);
}
public MyThreadPoll(int num) {
if (num > MAX_WORK_THREAD) {
workerNum = MAX_WORK_THREAD;
} else if (workerNum < MIN_WORK_THREAD) {
workerNum = MIN_WORK_THREAD;
} else {
workerNum = num;
}
initWorks(workerNum);
}
/**
* 初始化工作線程
*
* @param num
*/
private void initWorks(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread =
new Thread(worker, "THPOLL-WORKER-" + threadNum.incrementAndGet());
thread.start();
}
}
@Override
public void execute(Job job) {
if (job != null) {
synchronized (jobQueue) {
// 加入工作線程隊列
jobQueue.add(job);
// 嘗試喚醒線程
jobQueue.notify();
}
}
}
@Override
public void shutDown() {
// 線程關閉迴圈
for (Worker worker : workers) {
worker.shutDown();
}
// 全部喚醒
synchronized (jobQueue) {
jobQueue.notifyAll();
}
}
@Override
public void addWorkThread(int num) {
synchronized (jobQueue) {
if (num + this.workerNum > MAX_WORK_THREAD) {
num = MAX_WORK_THREAD - this.workerNum;
}
initWorks(num);
this.workerNum += num;
}
}
@Override
public void reduceWorkThread(int num) throws Exception {
synchronized (jobQueue) {
if (num >= this.workerNum) {
throw new Exception();
}
int count = num;
int succCount = 0;
while (count > 0) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutDown();
count--;
succCount++;
}
}
this.workerNum -= succCount;
}
}
@Override
public int getSize() {
return jobQueue.size();
}
private class Worker implements Runnable {
private volatile boolean running = true;
@Override
public void run() {
while (running) {
Job job = null;
synchronized (jobQueue) {
while (jobQueue.isEmpty() && running) {
try {
jobQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
if (!jobQueue.isEmpty()) {
job = jobQueue.removeFirst();
}
}
// 如果此時線程池已經被關閉,則忽略所有任務
// 現實情況可能有其他操作
if (job != null && running) {
try {
job.run();
System.out.println("JOB=" + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public void shutDown() {
running = false;
}
}
}