第九章 LinkedBlockingQueue源碼解析

来源:http://www.cnblogs.com/java-zhao/archive/2016/01/16/5135958.html
-Advertisement-
Play Games

1、對於LinkedBlockingQueue需要掌握以下幾點創建入隊(添加元素)出隊(刪除元素)2、創建Node節點內部類與LinkedBlockingQueue的一些屬性 static class Node { E item;//節點封裝的數據 /** ...


1、對於LinkedBlockingQueue需要掌握以下幾點

  • 創建
  • 入隊(添加元素)
  • 出隊(刪除元素)

2、創建

Node節點內部類與LinkedBlockingQueue的一些屬性

    static class Node<E> {
        E item;//節點封裝的數據
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */

        Node<E> next;//下一個節點
        Node(E x) { item = x; }
    }

    /** 指定鏈表容量  */
    private final int capacity;

    /** 當前的元素個數 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 鏈表頭節點 */
    private transient Node<E> head;

    /** 鏈表尾節點 */
    private transient Node<E> last;

    /** 出隊鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 出隊等待條件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入隊鎖 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入隊等待條件 */
    private final Condition notFull = putLock.newCondition();
View Code

2.1、public LinkedBlockingQueue(int capacity)

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>(1000);

源代碼:

    /**
     * 創建一個 LinkedBlockingQueue,容量為指定容量
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//初始化頭節點和尾節點,均為封裝了null數據的節點
    }
View Code

註意點:

  • LinkedBlockingQueue的組成一個鏈表+兩把鎖+兩個條件

 

2.2、public LinkedBlockingQueue()

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>();

源代碼:

    /**
     * 創建一個LinkedBlockingQueue,容量為整數最大值
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
View Code

註意點:預設容量為整數最大值,可以看做沒有容量限制

 

3、入隊:

3.1、public boolean offer(E e)

原理:

  • 在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false

使用方法:

  • abq.offer("hello1");

源代碼:

    /**
     * 在隊尾插入一個元素,
     * 容量沒滿,可以立即插入,返回true;
     * 隊列滿了,直接返回false
     * 註:如果使用了限制了容量的隊列,這個方法比add()好,因為add()插入失敗就會拋出異常
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;//獲取隊列中的元素個數
        if (count.get() == capacity)//隊列滿了
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//獲取入隊鎖
        try {
            if (count.get() < capacity) {//容量沒滿
                enqueue(e);//入隊
                c = count.getAndIncrement();//容量+1
                if (c + 1 < capacity)//如果上邊+1後的容量再加1,還小於指定容量(說明在插入當前元素後,至少還可以再插一個元素)
                    notFull.signal();//喚醒等待notFull條件的其中一個線程
            }
        } finally {
            putLock.unlock();//釋放入隊鎖
        }
        if (c == 0)//如果c==0,這是什麼情況?
            signalNotEmpty();
        return c >= 0;
    }
View Code
    /**
     * 創建一個節點,並加入鏈表尾部
     * @param x
     */
    private void enqueue(E x) {
        /*
         * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點
         */
        last = last.next = new Node<E>(x);
    }
View Code
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//獲取出隊鎖
        try {
            notEmpty.signal();//喚醒等待notEmpty條件的線程中的一個
        } finally {
            takeLock.unlock();//釋放出隊鎖
        }
    }
View Code

如果,入隊邏輯不懂,查看最後總結部分入隊邏輯的圖,代碼非常簡單,流程看註釋即可,只有一點註意點:

  • c==0是怎麼出現的?

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況: 
     * 1、被喚醒 
     * 2、等待時間超時 
     * 3、當前線程被中斷
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);// 轉換為納秒
        int c = -1;
        final ReentrantLock putLock = this.putLock;// 入隊鎖
        final AtomicInteger count = this.count;// 總數量
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {// 容量已滿
                if (nanos <= 0)// 已經超時
                    return false;
                /*
                 * 進行等待: 在這個過程中可能發生三件事: 
                 * 1、被喚醒-->繼續當前這個while迴圈
                 * 2、超時-->繼續當前這個while迴圈 
                 * 3、被中斷-->拋出中斷異常InterruptedException
                 */
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);// 入隊
            c = count.getAndIncrement();// 入隊元素數量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
View Code

註意:

  • awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去查看AQS的源代碼。

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在隊尾插入一個元素,如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷

使用方法:

        try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 在隊尾插一個元素
     * 如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;//入隊鎖
        final AtomicInteger count = this.count;//當前隊列中的元素個數
        putLock.lockInterruptibly();//加鎖
        try {
            while (count.get() == capacity) {//如果隊列滿了 
                /*
                 * 加入notFull等待隊列,直到隊列元素不滿了,
                 * 被其他線程使用notFull.signal()喚醒
                 */
                notFull.await();
            }
            enqueue(e);//入隊
            c = count.getAndIncrement();//入隊數量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
View Code

 

4、出隊

4.1、public E poll()

原理:

  • 如果沒有元素,直接返回null;如果有元素,出隊

使用方法:

abq.poll();

源代碼:

    /**
     * 出隊:
     * 1、如果沒有元素,直接返回null
     * 2、如果有元素,出隊
     */
    public E poll() {
        final AtomicInteger count = this.count;//獲取元素數量
        if (count.get() == 0)//沒有元素
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//獲取出隊鎖
        try {
            if (count.get() > 0) {//有元素
                x = dequeue();//出隊
                //元素個數-1(註意:該方法是一個無限迴圈,直到減1成功為止)
                c = count.getAndDecrement();
                if (c > 1)//還有元素
                    notEmpty.signal();//喚醒等待在notEmpty隊列中的其中一條線程
            }
        } finally {
            takeLock.unlock();//釋放出隊鎖
        }
        if (c == capacity)//c == capacity是怎麼發生的
            signalNotFull();
        return x;
    }
View Code
    /**
     * 從隊列頭部移除一個節點
     */
    private E dequeue() {
        Node<E> h = head;//獲取頭節點:x==null
        Node<E> first = h.next;//將頭節點的下一個節點賦值給first
        h.next = h; // 將當前將要出隊的節點置null(為了使其做head節點做準備)
        head = first;//將當前將要出隊的節點作為了頭節點
        E x = first.item;//獲取出隊節點的值
        first.item = null;//將出隊節點的值置空
        return x;
    }
View Code
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
View Code

註意:出隊邏輯如果不懂,查看最後總結部分的圖

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 從隊頭刪除一個元素,如果隊列不空,出隊;如果隊列已空且已經超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

使用方法:

        try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 從隊列頭部刪除一個元素,
     * 如果隊列不空,出隊;
     * 如果隊列已空,判斷時間是否超時,如果已經超時,返回null
     * 如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前線程被中斷
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {//如果隊列沒有元素
                if (nanos <= 0)//已經超時
                    return null;
                /*
                 * 進行等待:
                 * 在這個過程中可能發生三件事:
                 * 1、被喚醒-->繼續當前這個while迴圈
                 * 2、超時-->繼續當前這個while迴圈
                 * 3、被中斷-->拋出異常
                 */
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//出隊
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 將隊頭元素出隊,如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷

使用方法:

        try {
            abq.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
View Code

源代碼:

    /**
     * 出隊:
     * 如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;//獲取隊列中的元素總量
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//獲取出隊鎖
        try {
            while (count.get() == 0) {//如果沒有元素,一直阻塞
                /*
                 * 加入等待隊列, 一直等待條件notEmpty(即被其他線程喚醒)
                 * (喚醒其實就是,有線程將一個元素入隊了,然後調用notEmpty.signal()喚醒其他等待這個條件的線程,同時隊列也不空了)
                 */
                notEmpty.await();
            }
            x = dequeue();//出隊
            c = count.getAndDecrement();//元素數量-1
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
View Code

 

總結:

1、具體入隊與出隊的原理圖

圖中每一個節點前半部分表示封裝的數據x,後邊的表示指向的下一個引用。

1.1、初始化

 初始化之後,初始化一個數據為null,且head和last節點都是這個節點。

1.2、入隊兩個元素過後

這個可以根據入隊方法enqueue(E x)來看,源代碼再貼一遍:

    /**
     * 創建一個節點,並加入鏈表尾部
     * 
     * @param x
     */
    private void enqueue(E x) {
        /*
         * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點
         */
        last = last.next = new Node<E>(x);
    }
View Code

其實這我們就可以發現其實真正意義上出隊的頭節點是Head節點的下一個節點。(這也就是Node這個內部類中對next的註釋,我沒有翻譯)

1.3、出隊一個元素後

錶面上看,只是將頭節點的next指針指向了要刪除的x1.next,事實上這樣我覺的就完全可以,但是jdk實際上是將原來的head節點刪除了,而上邊看到的這個head節點,正是剛剛出隊的x1節點,只是其值被置空了。

這一塊對應著源代碼來看:dequeue()

    /**
     * 從隊列頭部移除一個節點
     */
    private E dequeue() {
        Node<E> h = head;// 獲取頭節點:x==null
        Node<E> first = h.next;// 將頭節點的下一個節點賦值給first
        h.next = h; // 將當前將要出隊的節點置null(為了使其做head節點做準備)
        head = first;// 將當前將要出隊的節點作為了頭節點
        E x = first.item;// 獲取出隊節點的值
        first.item = null;// 將出隊節點的值置空
        return x;
    }
View Code

 

2、三種入隊對比:

  • offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
  • put(E e):如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況:-->阻塞
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

 

3、三種出隊對比:

  • poll():如果沒有元素,直接返回null;如果有元素,出隊
  • take():如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷-->阻塞
  • poll(long timeout, TimeUnit unit):如果隊列不空,出隊;如果隊列已空且已經超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

 

4、ArrayBlockingQueue與LinkedBlockingQueue對比

  • ArrayBlockingQueue:
    • 一個對象數組+一把鎖+兩個條件
    • 入隊與出隊都用同一把鎖
    • 在只有入隊高併發或出隊高併發的情況下,因為操作數組,且不需要擴容,性能很高
    • 採用了數組,必須指定大小,即容量有限
  • LinkedBlockingQueue:
    • 一個單向鏈表+兩把鎖+兩個條件
    • 兩把鎖,一把用於入隊,一把用於出隊,有效的避免了入隊與出隊時使用一把鎖帶來的競爭。
    • 在入隊與出隊都高併發的情況下,性能比ArrayBlockingQueue高很多
    • 採用了鏈表,最大容量為整數最大值,可看做容量無限

 兩個疑問:(望大神指點)

  • 入隊時:c==0是怎樣出現的?
  • 出隊時:c==capcity是怎樣出現的?

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、利用brctl命令創建虛擬網橋br0brctl addbr br0ifconfig br0 up //上述兩條命令分開執行會導致網路斷開2、將虛擬網橋br0與物理網卡eth0綁定brctl addif br0 eth0ifconfig eth0 0.0.0.0 //網橋類似於交換機,此時...
  • 初學者筆記:linux的程式在發生異常情況下,會導致進程down,多數伺服器會設置生成core文件,本人在實際運營過程中發生過進程不斷core,被自動監控拉起,過段時間有core的情況,進而導致磁碟被撐滿,影響服務這裡對core稍作瞭解後core文件的大小限制:ulimit -c是可以改變core的...
  • 一、簡介 二、安裝 http://www.cnblogs.com/jadeboy/p/5132423.html 三、其他 1)Windows下效率必備軟體 http://www.jeffjade.com/2015/10/19/2015-10-18-Efficacious-win-software/#
  • Hacker Typer 讓你也能像電影中的黑客那樣寫代碼咯~~~
  • 2016-01-16 之前只是大概瞭解過c#語言,感覺掌握不牢靠.現在開始系統學習C#.現以該博客作為學習筆記,方便後續查看.C# 目標:系統掌握c#知識 時間:30天 範圍:C#基礎,Winform應用,C#高級,SQL及.net網頁.
  • WPF預設程式啟動:新建project後自動生成的App.xaml中指定程式啟動方式(StartupUri="MainWindow.xaml"),如下代碼所示,啟動MainWindow頁面 WPF用Main函數方式啟動程式:自己寫Main函數作為啟動點1.在WPF自動生成的App.cs文件中寫M.....
  • lock與C#多線程 lock 關鍵字將語句塊標記為臨界區,方法是獲取給定對象的互斥鎖,執行語句,然後釋放該鎖。簡單講就類似於 你去銀行辦理業務,一個櫃臺一次只能操作以為客戶,而如果你要到這個櫃臺辦理業務就必須等前面的人的業務完成,而彼此之間不會有交集。下麵通過具體的代碼來深入說明: using ....
  • PHP是動態類型的Web開發的腳本語言,PHP以頁面文件作為載入和運行的單元,PHP現在有了Composer作為開發包管理。1.使用Composer管理依賴自從.NET開發用了Nuget管理程式集依賴,我就再也離不開它了,幸虧Java中也有Maven管理jar包,雖然開源中國的鏡像太慢但還有ibib...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...