一、簡介 在 Java 多線程編程中,還有一個非常重要的設計模式,它就是:生產者和消費者模型。 這種模型可以充分發揮 cpu 的多線程特性,通過一些平衡手段能有效的提升系統整體處理數據的速度,減輕系統負載,提高程式的效率和穩定性,同時實現模塊之間的解耦。 那什麼是生產者和消費者模型呢? 簡單的說,生 ...
一、簡介
在 Java 多線程編程中,還有一個非常重要的設計模式,它就是:生產者和消費者模型。
這種模型可以充分發揮 cpu 的多線程特性,通過一些平衡手段能有效的提升系統整體處理數據的速度,減輕系統負載,提高程式的效率和穩定性,同時實現模塊之間的解耦。
那什麼是生產者和消費者模型呢?
簡單的說,生產者和消費者之間不直接進行交互,而是通過一個緩衝區來進行交互,生產者負責生成數據,然後存入緩衝區;消費者則負責處理數據,從緩衝區獲取。
大致流程圖如下:
對於最簡單的生產者和消費者模型,總結下來,大概有以下幾個特點:
- 緩衝區為空的時候,消費者不能消費,會進入休眠狀態,直到有新數據進入緩衝區,再次被喚醒
- 緩衝區填滿的時候,生產者不能生產,也會進入休眠狀態,直到緩衝區有空間,再次被喚醒
生產者和消費者模型作為一個非常重要的設計模型,它的優點在於:
- 解耦:生產者和消費者之間不直接進行交互,即使生產者和消費者的代碼發生變化,也不會對對方產生影響
- 消峰:例如在某項工作中,假如 A 操作生產數據的速度很快,B 操作處理速度很慢,那麼 A 操作就必須等待 B 操作完成才能結束,反之亦然。如果將 A 操作和B 操作進行解耦,中間插入一個緩衝區,這樣 A 操作將生產的數據存入緩衝區,就接受了;B 操作從緩衝區獲取數據併進行處理,平衡好 A 操作和 B 操作之間的緩衝區,可以顯著提升系統的數據處理能力
生產者和消費者模型的應用場景非常多,例如 Java 的線程池任務執行框架、消息中間件 rabbitMQ 等,因此掌握生產者和消費者模型,對於開發者至關重要。
下麵我們通過幾個案例,一起來瞭解一下生產者和消費者設計模型的實踐思路。
二、代碼實踐
2.1、利用 wait / notify 方法實現思路
生產者和消費者模型,最簡單的一種技術實踐方案就是基於線程的 wait() / notify() 方法,也就是通知和喚醒機制,可以將兩個操作實現解耦,具體代碼實踐如下。
/**
* 緩衝區容器類
*/
public class Container {
/**
* 緩衝區最大容量
*/
private int capacity = 3;
/**
* 緩衝區
*/
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加數據到緩衝區
* @param value
*/
public synchronized void add(Integer value) {
if(list.size() >= capacity){
System.out.println("生產者:"+ Thread.currentThread().getName()+",緩衝區已滿,生產者進入waiting...");
try {
// 進入等待狀態
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
//喚醒其他所有處於wait()的線程,包括消費者和生產者
notifyAll();
}
/**
* 從緩衝區獲取數據
*/
public synchronized void get() {
if(list.size() == 0){
System.out.println("消費者:"+ Thread.currentThread().getName()+",緩衝區為空,消費者進入waiting...");
try {
// 進入等待狀態
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 從頭部獲取數據,並移除元素
Integer val = list.removeFirst();
System.out.println("消費者:"+ Thread.currentThread().getName()+",value:" + val);
//喚醒其他所有處於wait()的線程,包括消費者和生產者
notifyAll();
}
}
/**
* 生產者類
*/
public class Producer extends Thread{
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.add(i);
}
}
}
/**
* 消費者類
*/
public class Consumer extends Thread{
private Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.get();
}
}
}
/**
* 測試類
*/
public class MyThreadTest {
public static void main(String[] args) {
Container container = new Container();
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
}
}
運行結果如下:
生產者:Thread-0,add:0
生產者:Thread-0,add:1
生產者:Thread-0,add:2
生產者:Thread-0,緩衝區已滿,生產者進入waiting...
消費者:Thread-1,value:0
消費者:Thread-1,value:1
消費者:Thread-1,value:2
消費者:Thread-1,緩衝區為空,消費者進入waiting...
生產者:Thread-0,add:3
生產者:Thread-0,add:4
生產者:Thread-0,add:5
消費者:Thread-1,value:3
消費者:Thread-1,value:4
消費者:Thread-1,value:5
從日誌上可以很清晰的看到,生產者線程生產一批數據之後,當緩衝區已經滿了,會進入等待狀態,此時會通知消費者線程;消費者線程處理完數據之後,當緩衝區沒有數據時,也會進入等待狀態,再次通知生產者線程。
2.2、利用 await / signal 方法實現思路
除此之外,我們還可以利用ReentrantLock
和Condition
類中的 await() / signal() 方法實現生產者和消費者模型。
緩衝區容器類,具體代碼實踐如下。
/**
* 緩衝區容器類
*/
public class Container {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int capacity = 3;
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加數據到緩衝區
* @param value
*/
public void add(Integer value) {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() >= capacity){
System.out.println("生產者:"+ Thread.currentThread().getName()+",緩衝區已滿,生產者進入waiting...");
// 進入等待狀態
condition.await();
}
System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
//喚醒其他所有處於wait()的線程,包括消費者和生產者
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
/**
* 從緩衝區獲取數據
*/
public void get() {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() == 0){
System.out.println("消費者:"+ Thread.currentThread().getName()+",緩衝區為空,消費者進入waiting...");
// 進入等待狀態
condition.await();
}
// 從頭部獲取數據,並移除元素
Integer val = list.removeFirst();
System.out.println("消費者:"+ Thread.currentThread().getName()+",value:" + val);
//喚醒其他所有處於wait()的線程,包括消費者和生產者
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
}
生產者、消費者、測試類代碼,跟上文一致,運行結果和上文介紹的也是一樣。
2.3、多生產者和消費者的實現思路
上面介紹的都是一個生產者線程和一個消費者線程,模型比較簡單。實際上,在業務開發中,經常會出現多個生產者線程和多個消費者線程,按照以上的實現思路,會出現什麼情況呢?
有可能會出現程式假死現象!下麵我們來分析一下案例,假如有兩個生產者線程 a1、a2,兩個消費者線程 b1、b2,執行過程如下:
- 1.生產者線程 a1 執行生產數據的操作,發現緩衝區數據已經填滿了,然後進入等待階段,同時向外發起通知,喚醒其它線程
- 2.因為線程喚醒具有隨機性,本應該喚醒消費者線程 b1,結果可能生產者線程 a2 被喚醒,檢查緩衝區數據已經填滿了,又進入等待階段,緊接向外發起通知,消費者線程得不到被執行的機會
- 3.消費者線程 b1、b2,也有可能會出現這個現象,本應該喚醒生產者線程,結果喚醒了消費者線程
遇到這種情況,應該如何解決呢?
因為ReentrantLock
和Condition
的結合,編程具有高度靈活性,我們可以採用這種組合解決多生產者和多消費者中的假死問題。
具體實現邏輯如下:
/**
* 緩衝區容器類
*/
public class ContainerDemo {
private Lock lock = new ReentrantLock();
private Condition producerCondition = lock.newCondition();
private Condition consumerCondition = lock.newCondition();
private int capacity = 3;
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加數據到緩衝區
* @param value
*/
public void add(Integer value) {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() >= capacity){
System.out.println("生產者:"+ Thread.currentThread().getName()+",緩衝區已滿,生產者進入waiting...");
// 生產者進入等待狀態
producerCondition.await();
}
System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
// 喚醒所有消費者處於wait()的線程
consumerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
/**
* 從緩衝區獲取數據
*/
public void get() {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() == 0){
System.out.println("消費者:"+ Thread.currentThread().getName()+",緩衝區為空,消費者進入waiting...");
// 消費者進入等待狀態
consumerCondition.await();
}
// 從頭部獲取數據,並移除元素
Integer val = list.removeFirst();
System.out.println("消費者:"+ Thread.currentThread().getName()+",value:" + val);
// 喚醒所有生產者處於wait()的線程
producerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
}
/**
* 生產者
*/
public class Producer extends Thread{
private ContainerDemo container;
private Integer value;
public Producer(ContainerDemo container, Integer value) {
this.container = container;
this.value = value;
}
@Override
public void run() {
container.add(value);
}
}
/**
* 消費者
*/
public class Consumer extends Thread{
private ContainerDemo container;
public Consumer(ContainerDemo container) {
this.container = container;
}
@Override
public void run() {
container.get();
}
}
/**
* 測試類
*/
public class MyThreadTest {
public static void main(String[] args) {
ContainerDemo container = new ContainerDemo();
List<Thread> threadList = new ArrayList<>();
// 初始化6個生產者線程
for (int i = 0; i < 6; i++) {
threadList.add(new Producer(container, i));
}
// 初始化6個消費者線程
for (int i = 0; i < 6; i++) {
threadList.add(new Consumer(container));
}
// 啟動線程
for (Thread thread : threadList) {
thread.start();
}
}
}
運行結果如下:
生產者:Thread-0,add:0
生產者:Thread-1,add:1
生產者:Thread-2,add:2
生產者:Thread-3,緩衝區已滿,生產者進入waiting...
生產者:Thread-4,緩衝區已滿,生產者進入waiting...
生產者:Thread-5,緩衝區已滿,生產者進入waiting...
消費者:Thread-6,value:0
消費者:Thread-7,value:1
生產者:Thread-3,add:3
生產者:Thread-4,add:4
生產者:Thread-5,add:5
消費者:Thread-8,value:2
消費者:Thread-9,value:3
消費者:Thread-10,value:4
消費者:Thread-11,value:5
通過ReentrantLock
定義兩個Condition
,一個表示生產者的Condition
,一個表示消費者的Condition
,喚醒的時候調用對應的signalAll()
方法就可以解決假死現象。
三、小結
最後我們來總結一下,對於生產者和消費者模型,通過合理的編程實現,可以充分充分發揮 cpu 多線程的特性,顯著的提升系統處理數據的效率。
對於生產者和消費者模型中的假死現象,可以使用ReentrantLock
定義兩個Condition
,進行交叉喚醒,以解決假死問題。
四、參考
1、https://www.cnblogs.com/xrq730/p/4855663.html
作者:程式員志哥
出處:pzblog.cn
資源:微信搜【程式員志哥】關註我,回覆 【技術資料】有我準備的一線程式必備電腦書籍、大廠面試資料和免費電子書。 希望可以幫助大家提升技術和能力。