不知道你是否遇到過面試官讓你手寫生產者消費者代碼。別說,前段時間有小伙伴還真的遇到了這種情況。當時是一臉懵逼。 但是,俗話說,從哪裡跌倒就要從哪裡爬起來。既然這次被問到了,那就回去好好研究一下,爭取下一次不再被虐唄。 於是,今天我決定手敲一個生產者消費者模式壓壓驚。(因為我也不想以後被面試官血虐啊) ...
不知道你是否遇到過面試官讓你手寫生產者消費者代碼。別說,前段時間有小伙伴還真的遇到了這種情況。當時是一臉懵逼。
但是,俗話說,從哪裡跌倒就要從哪裡爬起來。既然這次被問到了,那就回去好好研究一下,爭取下一次不再被虐唄。
於是,今天我決定手敲一個生產者消費者模式壓壓驚。(因為我也不想以後被面試官血虐啊)
生產者消費者模式,其實很簡單。無非就是生產者不停的生產數據,消費者不停的消費數據。(這不廢話嗎,字面意思我也知道啊)
咳咳。其實,我們可以拿水池來舉例。
比如,現在要用多個註水管往水池裡邊註水,那這些註水管就認為是生產者。從水池裡邊抽水的抽水管就是消費者。水池本身就是一個緩衝區,用於生產者消費者之間的通訊。
好的,跟著我的思路。
既然生產者是生產數據的,那總得定義一個數據類吧(Data)
public class Data {
private int id;
private int num;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Data(int id, int num) {
this.id = id;
this.num = num;
}
public Data() {
}
}
以上數據,假設註水管每次註水的id和註水容量num(單位是升)都是遞增的。並且,單次出水管的出水量和註水管的註水量是一一對應的。
生產者的類Producer和消費者類Consumer內部都需要維護一個阻塞隊列,來存儲緩衝區的數據。
public class Producer implements Runnable{
//共用阻塞隊列
private BlockingDeque<Data> queue;
//是否還在運行
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//生成隨機數
private static Random random = new Random();
public Producer(BlockingDeque<Data> queue){
this.queue = queue;
}
@Override
public void run() {
try {
while(isRunning){
//模擬註水耗時
Thread.sleep(random.nextInt(1000));
int num = count.incrementAndGet();
Data data = new Data(num, num);
System.out.println("當前>>註水管:"+Thread.currentThread().getName()+"註水容量(L):"+num);
if(!queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("註水失敗...");
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
isRunning = false;
}
}
消費者:
public class Consumer implements Runnable{
private BlockingDeque<Data> queue ;
private static Random random = new Random();
public Consumer(BlockingDeque<Data> queue){
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Data data = queue.take();
//模擬抽水耗時
Thread.sleep(random.nextInt(1000));
if(data != null){
System.out.println("當前<<抽水管:"+Thread.currentThread().getName()+",抽取水容量(L):"+data.getNum());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
測試類,假設有三個註水管和三個出水管(即六個線程)同時運行。等一定時間後,所有註水管停止註水,則當水池空(阻塞隊列為空)的時候,出水管也將不再出水。
public class TestProC {
public static void main(String[] args) throws InterruptedException {
BlockingDeque<Data> queue = new LinkedBlockingDeque<>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(3000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(1000);
service.shutdown();
}
}
運行結果如下:
到最後一次註水20L的時候,所有註水管都停止註水了,但此時水池還沒空。於是,所有出水管繼續消費水資源,直到最後20L也被消費完。
以上,就是一個典型的生產者消費者模式。
可以看到,這種模式有很多優點:
1)可以解耦消費者和生產者,因為它們是兩個不同的類,互相之間不會產生影響。
2)支持併發。生產者只管生產數據就行了,生產完直接把數據丟到緩衝區,而不需要等消費者消費完數據才可以生產下一個數據。否則會造成阻塞,從而影響效率。
3)允許生產者和消費者有不同的處理速度。如,當生產者生產數據比較快的時候,會把消費者還沒來得及處理的數據先放到緩衝區。等有空閑的消費者了,再去緩衝區拿去數據。
另外,以上的緩衝區,我們一般會使用阻塞隊列。就像上邊用的LinkedBlockingDeque。
這樣,當隊列滿的時候,會阻塞生產者繼續往隊列添加數據,直到有消費者來消費了隊列中的數據。當隊列空的時候,也會阻塞消費者從隊列獲取數據,直到有生產者把數據放入到隊列中。
阻塞隊列最好使用有界隊列(代碼中指定的容量為10)。因為,如果生產者的速度遠遠大於消費者時,就會有可能造成隊列的元素一直增加,直到記憶體耗盡。當然,這也需要看實際的業務情況。如果能保證生產者的數量在可控範圍內,不會給記憶體造成壓力,用無界隊列,也未嘗不可。