併發隊列: 在併發隊列中,JDK有兩套實現: ConcurrentLinkedQueue:非阻塞式隊列 BlockingQueue:阻塞式隊列 阻塞式隊列非阻塞式隊列的區別: 阻塞式隊列入列操作的時候,如果超出隊列總數,這個時候會進行等待;在出列的時候,如果隊列為空,也會等待 非阻塞無論如何都不等待 ...
併發隊列:
在併發隊列中,JDK有兩套實現:
ConcurrentLinkedQueue:非阻塞式隊列
BlockingQueue:阻塞式隊列
阻塞式隊列非阻塞式隊列的區別:
阻塞式隊列入列操作的時候,如果超出隊列總數,這個時候會進行等待;在出列的時候,如果隊列為空,也會等待
非阻塞無論如何都不等待
非阻塞效率更高,但是阻塞使用更廣泛
阻塞隊列的優點是能夠防止隊列容器溢出,防止丟失
非阻塞隊列:
public class QueueDemo { public static void main(String[] args) { ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<>(); concurrentLinkedQueue.offer("張三"); concurrentLinkedQueue.offer("李四"); concurrentLinkedQueue.offer("王五"); for (int i = 0; i < 4; i++) { System.out.println(concurrentLinkedQueue.poll()); } } }
列印如下:
張三 李四 王五 null
阻塞隊列(重要):需要初始化隊列總數
public class QueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3); //添加非阻塞式隊列 arrayBlockingQueue.offer("張三"); arrayBlockingQueue.offer("李四"); arrayBlockingQueue.offer("王五"); //添加阻塞式隊列,等待時間為3s arrayBlockingQueue.offer("趙六",3, TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(3,TimeUnit.SECONDS)); } }
這種情況,等待3秒後列印:張三,李四,王五,再等待3秒後列印:null
換一下代碼:
public class QueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3); //添加非阻塞式隊列 arrayBlockingQueue.offer("張三"); arrayBlockingQueue.offer("李四"); System.out.println(arrayBlockingQueue.poll()); arrayBlockingQueue.offer("王五"); //添加阻塞式隊列,等待時間為3s arrayBlockingQueue.offer("趙六",3, TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(3,TimeUnit.SECONDS)); } }
這種情況,立即列印張三,李四,王五,趙六,等待3秒後列印null
示例:
public class QueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3); //添加非阻塞式隊列 boolean success1 = arrayBlockingQueue.offer("張三"); boolean success2 = arrayBlockingQueue.offer("李四"); boolean success3 = arrayBlockingQueue.offer("王五"); //添加阻塞式隊列,等待時間為3s boolean success4 = arrayBlockingQueue.offer("趙六",3, TimeUnit.SECONDS); System.out.println(success1); System.out.println(success2); System.out.println(success3); System.out.println(success4); } }
等待3秒後列印:true,true,true,false;說明趙六沒有入列成功
生產者消費者示例:
下麵模擬一個生產者消費者的例子,以便於更好地理解:
生產者線程存一個隊列,消費者線程取一個隊列,多線程中可以採用等待喚醒機制,在這裡採用併發隊列實現
package org.dreamtech; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生產者線程,負責添加隊列 */ class ProducerThread implements Runnable { private BlockingQueue<String> blockingQueue; private volatile boolean FLAG = true; private AtomicInteger atomicInteger = new AtomicInteger(); ProducerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { try { System.out.println("---生產者線程啟動成功---"); while (FLAG) { String data = atomicInteger.incrementAndGet() + ""; boolean success = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (success) { System.out.println("---生產者存入隊列成功->data:" + data + "---"); } else { System.out.println("---生產者存入隊列失敗->data:" + data + "---"); } Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("---生產者線程已經結束---"); } } public void stop() { this.FLAG = false; } } /** * 消費者線程,負責獲取隊列 */ class ConsumerThread implements Runnable { private BlockingQueue<String> blockingQueue; private boolean FLAG = true; ConsumerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { try { System.out.println("---消費者線程啟動成功---"); while (FLAG) { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null) { System.out.println("---消費者沒有獲取到隊列信息---"); FLAG = false; return; } System.out.println("---消費者獲得隊列信息->data:" + data + "---"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("---消費者線程已經結束---"); } } } public class Test { public static void main(String[] args) { try { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10); ProducerThread producerThread = new ProducerThread(blockingQueue); ConsumerThread consumerThread = new ConsumerThread(blockingQueue); Thread producer = new Thread(producerThread); Thread consumer = new Thread(consumerThread); producer.start(); consumer.start(); Thread.sleep(10000); producerThread.stop(); } catch (InterruptedException e) { e.printStackTrace(); } } }
列印如下:
---消費者線程啟動成功---
---生產者線程啟動成功---
---生產者存入隊列成功->data:1---
---消費者獲得隊列信息->data:1---
---生產者存入隊列成功->data:2---
---消費者獲得隊列信息->data:2---
.............................................
---生產者存入隊列成功->data:9---
---消費者獲得隊列信息->data:9---
---生產者存入隊列成功->data:10---
---消費者獲得隊列信息->data:10---
---生產者線程已經結束---
---消費者沒有獲取到隊列信息---
---消費者線程已經結束---