java多線程-BlockingQueue

来源:http://www.cnblogs.com/lcngu/archive/2016/02/28/5224476.html
-Advertisement-
Play Games

BlockingQueue簡介 ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。 LinkedBlockingQueu


  • BlockingQueue簡介

  ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。

  LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則預設大小為Integer.MAX_VALUE,每次插入後都將動態地創建鏈接節點。

  PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先順序對元素進行排序,按照優先順序順序出隊,每次出隊的元素都是優先順序最高的元素,依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。註意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標誌),前面2種都是有界隊列。

  DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

  SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。

  • BlockingQueue內容

  BlockingQueue主要方法:

  拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用

  對於非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。註意,非阻塞隊列中的方法都沒有進行同步措施。

  • BlockingQueue實現原理

  以ArrayBlockingQueue為例,查看其源代碼,其中主要包含以下對象:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;

    /** 數組對象,用於放置對象 */
    final Object[] items;

    /** put, offer, or add方法放入數組的索引 */
    int putIndex;

    /**  take, poll, peek or remove方法取出數據的數組索引 */
    int takeIndex;
    /** queue隊列的總數 */
    int count;

    /**可重入鎖,控制併發*/
    final ReentrantLock lock;
    /** 非空信號量,可以取數*/
    private final Condition notEmpty;
    /** 非滿信號量,可以放數 */
    private final Condition notFull;
}

  下麵主要介紹下put()和take()方法,來觀察其同步的實現:

 1 public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();
 8             insert(e);
 9         } finally {
10             lock.unlock();
11         }
12 }
 1 public E take() throws InterruptedException {
 2         final ReentrantLock lock = this.lock;
 3         lock.lockInterruptibly();
 4         try {
 5             while (count == 0)
 6                 notEmpty.await();
 7             return extract();
 8         } finally {
 9             lock.unlock();
10         }
11     }

  大家應該明白了阻塞隊列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者的思路類似,只不過它把這些工作一起集成到了阻塞隊列中實現。並且在前面Condition中我們也模擬實現了一個阻塞隊列,實現與其大同小異。

  • BlockingQueue應用

  1:啟動兩個線程實現互斥等待:

 1 public class BlockingQueueTest {
 2     public static void main(String[] args) {
 3         final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
 4         for (int i = 0; i < 2; i++) {
 5             new Thread(new Runnable() {
 6                 @Override
 7                 public void run() {
 8                     while (true) {
 9                         System.out.println("Thread "+Thread.currentThread().getName()+"正在準備放入數據");
10                         try {
11                             //模擬線程的放數速度
12                             Thread.sleep(new Random().nextInt(1000));
13                         } catch (InterruptedException e) {
14                             // TODO Auto-generated catch block
15                             e.printStackTrace();
16                         }
17                         try {
18                             queue.put(1);
19                         } catch (InterruptedException e) {
20                             // TODO Auto-generated catch block
21                             e.printStackTrace();
22                         }
23                         System.out.println("Thread "+Thread.currentThread().getName()+"放入數據,此時隊列中的數據為:"+queue.size());
24                     }
25                 }
26             }).start();
27             new Thread(new Runnable() {
28                 @Override
29                 public void run() {
30                     while (true) {
31                         System.out.println("Thread "+Thread.currentThread().getName()+"正在取得數據");
32                         try {
33                             //模擬線程的去數速度
34                             Thread.sleep(100);
35                         } catch (InterruptedException e) {
36                             // TODO Auto-generated catch block
37                             e.printStackTrace();
38                         }
39                         try {
40                             queue.take();
41                         } catch (InterruptedException e) {
42                             // TODO Auto-generated catch block
43                             e.printStackTrace();
44                         }
45                         System.out.println("Thread "+Thread.currentThread().getName()+"取得數據,此時隊列中的數據為:"+queue.size());
46                     }
47                 }
48             }).start();
49         }
50         
51     }
52 }

  2:前面介紹傳統線程通信中,主線程和子線程交替運行,現在以阻塞隊列來實現。

 1 public class BlockingQueueCommunication {
 2     public static void main(String[] args) {
 3         final Business business = new Business();
 4         new Thread(new Runnable() {
 5             
 6             @Override
 7             public void run() {
 8                 // TODO Auto-generated method stub
 9                 for (int i = 0; i < 50; i++) {
10                     try {
11                         business.sub(i);
12                     } catch (InterruptedException e) {
13                         // TODO Auto-generated catch block
14                         e.printStackTrace();
15                     }
16                 }
17             }
18         }).start();
19         for (int i = 0; i < 50; i++) {
20             try {
21                 business.main(i);
22             } catch (InterruptedException e) {
23                 // TODO Auto-generated catch block
24                 e.printStackTrace();
25             }
26         }
27     }
28     static class Business{
29         BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
30         BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
31         {
32             try {
33                 queue2.put(1);//保證queue2阻塞
34             } catch (InterruptedException e) {
35                 // TODO Auto-generated catch block
36                 e.printStackTrace();
37             }
38         }
39         
40         public void main(int i) throws InterruptedException{
41             queue1.put(1);//阻塞queue1
42             for (int j = 0; j < 100; j++) {
43                 System.out.println("main thread is looping of "+j +" in " + i);
44             }
45             queue2.take();//喚醒queue2
46         }
47         public void sub(int i) throws InterruptedException{
48             queue2.put(1);//阻塞queue2
49             for (int j = 0; j < 10; j++) {
50                 System.out.println("sub thread is looping of "+j +" in " + i);
51             }
52             queue1.take();//喚醒queue1
53         }
54     }
55 }
  BlockingQueue實現了線程同步,不可在方法中再次加入同步限制,否則會出現死鎖。

  3:在API中有一個阻塞對象實現生產者和消費者的例子

 1 class Producer implements Runnable {
 2    private final BlockingQueue queue;
 3    Producer(BlockingQueue q) { queue = q; }
 4    public void run() {
 5      try {
 6        while(true) { queue.put(produce()); }
 7      } catch (InterruptedException ex) { ... handle ...}
 8    }
 9    Object produce() { ... }
10  }
11 
12  class Consumer implements Runnable {
13    private final BlockingQueue queue;
14    Consumer(BlockingQueue q) { queue = q; }
15    public void run() {
16      try {
17        while(true) { consume(queue.take()); }
18      } catch (InterruptedException ex) { ... handle ...}
19    }
20    void consume(Object x) { ... }
21  }
22 
23  class Setup {
24    void main() {
25      BlockingQueue q = new SomeQueueImplementation();
26      Producer p = new Producer(q);
27      Consumer c1 = new Consumer(q);
28      Consumer c2 = new Consumer(q);
29      new Thread(p).start();
30      new Thread(c1).start();
31      new Thread(c2).start();
32    }
33  }

  使用阻塞隊列代碼要簡單得多,不需要再單獨考慮同步和線程間通信的問題。

  在併發編程中,一般推薦使用阻塞隊列,這樣實現可以儘量地避免程式出現意外的錯誤。

  阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然後解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。

  參考資料:http://www.cnblogs.com/dolphin0520/p/3932906.html

       javaAPI


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Hadoop、Redis、Memcached、MongoDB、Spark、Storm、雲計算、R語言、機器學習、Nginx、Linux、MySQL、Java EE、.NET、PHP R語言 視頻教程 實戰 數據分析 數據挖掘 入門編程 培訓 核心技術
  • 1.什麼叫做會話控制 允許伺服器根據客戶端做出的連續請求。 2.為什麼需要會話控制? 因為當你打開一個網站,並想訪問該網站的其他頁面的時候,如果沒有會話控制,當跳轉到其他頁面的 時候,就需要再次輸入賬戶和密碼。 3.Cookie的原理和作用 將客戶端的簡單信息保存在個人PC中,其他程式獲取PC的Co
  • java同步容器 在Java的集合容器框架中,主要有四大類別:List、Set、Queue、Map。List、Set、Queue介面分別繼承了Collection介面,Map本身是一個介面。註意Collection和Map是一個頂層介面,而List、Set、Queue則繼承了Collection介面
  • 1 YUV2RGB的模塊如下: 1 module yuv2rgb( 2 clk, //時鐘輸入 3 rstn, //複位輸入,低電平複位 4 5 y_in, //變換前Y分量輸出 6 cb_in, //變換前Cb分量輸出 7 cr_in, //變換前Cr分量輸出 8 ena_in, //待變換數據使
  • spring沒有採用約定優於配置的策略,spring要求顯示指定搜索哪些路徑下的Java文件。spring將會把合適的java類全部註冊成spring Bean。 問題:spring怎麼知道把哪些Java類當初bean類處理? 這就需要使用annotation,spring使用一些特殊的annota
  • 本文將通過一個 Python 實現的圖片文件批量重命名工具來闡述如何逐步提升程式質量。
  • 學了線程,收穫不少,記錄下了吧. 一、線程的主要兩種實現方法。 1.繼承Thread類,重寫run()方法 main方法中創建子類,引用調用start()方法 實例如下: //繼承Thread類,重寫run()方法 public class ThreadOne extends Thread { pu
  • 《數據結構》第2章第10節歸併擴展的線性鏈表。
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...