第八章 ArrayBlockingQueue源碼解析

来源:http://www.cnblogs.com/java-zhao/archive/2016/01/16/5135410.html
-Advertisement-
Play Games

註意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock源碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock源碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結》1、對於ArrayB...


註意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock源碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock源碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結

1、對於ArrayBlockingQueue需要掌握以下幾點

  • 創建
  • 入隊(添加元素)
  • 出隊(刪除元素)

2、創建

  • public ArrayBlockingQueue(int capacity, boolean fair)
  • public ArrayBlockingQueue(int capacity)

使用方法:

  • Queue<String> abq = new ArrayBlockingQueue<String>(2);
  • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

通過使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平鎖模式與非公平鎖模式,對於這兩種模式,查看本文開頭的文章即可。

源代碼如下:

    private final E[] items;//底層數據結構
    private int takeIndex;//用來為下一個take/poll/remove的索引(出隊)
    private int putIndex;//用來為下一個put/offer/add的索引(入隊)
    private int count;//隊列中元素的個數

    /*
     * Concurrency control uses the classic two-condition algorithm found in any
     * textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;//
    /** Condition for waiting takes */
    private final Condition notEmpty;//等待出隊的條件
    /** Condition for waiting puts */
    private final Condition notFull;//等待入隊的條件
View Code
    /**
     * 創造一個隊列,指定隊列容量,指定模式
     * @param fair
     * true:先來的線程先操作
     * false:順序隨機
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];//初始化類變數數組items
        lock = new ReentrantLock(fair);//初始化類變數鎖lock
        notEmpty = lock.newCondition();//初始化類變數notEmpty Condition
        notFull = lock.newCondition();//初始化類變數notFull Condition
    }

    /**
     * 創造一個隊列,指定隊列容量,預設模式為非公平模式
     * @param capacity <1會拋異常
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
View Code

註意:

  • ArrayBlockingQueue的組成:一個對象數組+1把鎖ReentrantLock+2個條件Condition
  • 在查看源碼的過程中,也要模仿帶條件鎖的使用,這個雙條件鎖模式是很經典的模式

3、入隊

3.1、public boolean offer(E e)

原理:

  • 在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false

使用方法:

  • abq.offer("hello1");

源代碼:

    /**
     * 在隊尾插入一個元素,
     * 如果隊列沒滿,立即返回true;
     * 如果隊列滿了,立即返回false
     * 註意:該方法通常優於add(),因為add()失敗直接拋異常
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//數組滿了
                return false;
            else {//數組沒滿
                insert(e);//插入一個元素
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
View Code
    private void insert(E x) {
        items[putIndex] = x;//插入元素
        putIndex = inc(putIndex);//putIndex+1
        ++count;//元素數量+1
        /**
         * 喚醒一個線程
         * 如果有任意一個線程正在等待這個條件,那麼選中其中的一個區喚醒。
         * 在從等待狀態被喚醒之前,被選中的線程必須重新獲得鎖
         */
        notEmpty.signal();
    }
View Code
    /**
     * i+1,數組下標+1
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }
View Code

代碼非常簡單,流程看註釋即可,只有一點註意點:

  • 在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的線程,可以發現這類似於生產者-消費者模式

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插入一個元素,
     * 如果數組已滿,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前線程被中斷
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒
        final ReentrantLock lock = this.lock;
        /*
         * lockInterruptibly():
         * 1、 在當前線程沒有被中斷的情況下獲取鎖。
         * 2、如果獲取成功,方法結束。
         * 3、如果鎖無法獲取,當前線程被阻塞,直到下麵情況發生:
         * 1)當前線程(被喚醒後)成功獲取鎖
         * 2)當前線程被其他線程中斷
         * 
         * lock()
         * 獲取鎖,如果鎖無法獲取,當前線程被阻塞,直到鎖可以獲取並獲取成功為止。
         */
        lock.lockInterruptibly();//加可中斷的鎖
        try {
            for (;;) {
                if (count != items.length) {//隊列未滿
                    insert(e);
                    return true;
                }
                if (nanos <= 0)//已超時
                    return false;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)迴圈
                     * 2、超時-->繼續當前這個for(;;)迴圈
                     * 3、被中斷-->之後直接執行catch部分的代碼
                     */
                    nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,線程也可能被喚醒)
                } catch (InterruptedException ie) {//在等待的過程中線程被中斷
                    notFull.signal(); // 喚醒其他未被中斷的線程
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }
View Code

註意:

  • awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去查看AQS的源代碼。
  • lockInterruptibly()與lock()的區別見註釋

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在隊尾插入一個元素,如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷

使用方法:

        try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插入一個元素
     * 如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null)
            throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)//隊列滿了,一直阻塞在這裡
                    /*
                     * 一直等待條件notFull,即被其他線程喚醒
                     * (喚醒其實就是,有線程將一個元素出隊了,然後調用notFull.signal()喚醒其他等待這個條件的線程,同時隊列也不慢了)
                     */
                    notFull.await();
            } catch (InterruptedException ie) {//如果被中斷
                notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的線程
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }
View Code

 

4、出隊

4.1、public E poll()

原理:

  • 如果沒有元素,直接返回null;如果有元素,將隊頭元素置null,但是要註意隊頭是隨時變化的,並非一直是items[0]。

使用方法:

abq.poll();

源代碼:

    /**
     * 出隊
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)//如果沒有元素,直接返回null,而非拋出異常
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
View Code
    /**
     * 出隊
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];//獲取出隊元素
        items[takeIndex] = null;//將出隊元素位置置空
        /*
         * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1
         * (註意:這裡出隊之後,並沒有將後面的數組元素向前移)
         */
        takeIndex = inc(takeIndex);
        --count;//數組元素個數-1
        notFull.signal();//數組已經不滿了,喚醒其他等待notFull條件的線程
        return x;//返回出隊的元素
    }
View Code

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 從對頭刪除一個元素,如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 從對頭刪除一個元素,
     * 如果數組不空,出隊;
     * 如果數組已空,判斷時間是否超時,如果已經超時,返回null
     * 如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前線程被中斷
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);//將時間轉換為納秒
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != 0) {//數組不空
                    E x = extract();//出隊
                    return x;
                }
                if (nanos <= 0)//時間超時
                    return null;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)迴圈
                     * 2、超時-->繼續當前這個for(;;)迴圈
                     * 3、被中斷-->之後直接執行catch部分的代碼
                     */
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    }
View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 將隊頭元素出隊,如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷

使用方法:

        try {
            abq.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 將隊頭元素出隊
     * 如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)//如果數組為空,一直阻塞在這裡
                    /*
                     * 一直等待條件notEmpty,即被其他線程喚醒
                     * (喚醒其實就是,有線程將一個元素入隊了,然後調用notEmpty.signal()喚醒其他等待這個條件的線程,同時隊列也不空了)
                     */
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
View Code

  

總結:

1、具體入隊與出隊的原理圖:這裡只說一種情況,見下圖,途中深色部分表示已有元素,淺色部分沒有元素。

 

上面這種情況是怎麼形成的呢?當隊列滿了,這時候,隊頭元素為items[0]出隊了,就形成上邊的這種情況。

假設現在又要出隊了,則現在的隊頭元素是items[1],出隊後就形成下麵的情形。

 

出隊後,對頭元素就是items[2]了,假設現在有一個元素將要入隊,根據inc方法,我們可以得知,他要插入到items[0]去,入隊了形成下圖:

以上就是整個入隊出隊的流程,inc方法上邊已經給出,這裡再貼一遍:

    /**
     * i+1,數組下標+1
     * 註意:這裡這樣寫的原因。
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }
View Code

 

2、三種入隊對比:

  • offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
  • put(E e):如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:-->阻塞
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

 

3、三種出對對比:

  • poll():如果沒有元素,直接返回null;如果有元素,出隊
  • take():如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷-->阻塞
  • poll(long timeout, TimeUnit unit):如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Hacker Typer 讓你也能像電影中的黑客那樣寫代碼咯~~~
  • 2016-01-16 之前只是大概瞭解過c#語言,感覺掌握不牢靠.現在開始系統學習C#.現以該博客作為學習筆記,方便後續查看.C# 目標:系統掌握c#知識 時間:30天 範圍:C#基礎,Winform應用,C#高級,SQL及.net網頁.
  • WPF預設程式啟動:新建project後自動生成的App.xaml中指定程式啟動方式(StartupUri="MainWindow.xaml"),如下代碼所示,啟動MainWindow頁面 WPF用Main函數方式啟動程式:自己寫Main函數作為啟動點1.在WPF自動生成的App.cs文件中寫M.....
  • lock與C#多線程 lock 關鍵字將語句塊標記為臨界區,方法是獲取給定對象的互斥鎖,執行語句,然後釋放該鎖。簡單講就類似於 你去銀行辦理業務,一個櫃臺一次只能操作以為客戶,而如果你要到這個櫃臺辦理業務就必須等前面的人的業務完成,而彼此之間不會有交集。下麵通過具體的代碼來深入說明: using ....
  • PHP是動態類型的Web開發的腳本語言,PHP以頁面文件作為載入和運行的單元,PHP現在有了Composer作為開發包管理。1.使用Composer管理依賴自從.NET開發用了Nuget管理程式集依賴,我就再也離不開它了,幸虧Java中也有Maven管理jar包,雖然開源中國的鏡像太慢但還有ibib...
  • 1、對於LinkedBlockingQueue需要掌握以下幾點創建入隊(添加元素)出隊(刪除元素)2、創建Node節點內部類與LinkedBlockingQueue的一些屬性 static class Node { E item;//節點封裝的數據 /** ...
  • 介紹Sublime編輯器 Sublime Text 3官方版是Sublime Text2的升級版。Sublime Text是一款流行的文本編輯器軟體,有點類似於TextMate,跨平臺,可運行在Linux,Windows和Mac OS X。也是許多程式員喜歡使用的一款文本編輯器軟體。 Sublime...
  • 用HTML界面來強化VBS和JS本地腳本的交互能力與友好性!!!
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...