死磕 java集合之DelayQueue源碼分析

来源:https://www.cnblogs.com/tong-yuan/archive/2019/04/28/DelayQueue.html
-Advertisement-
Play Games

DelayQueue是阻塞隊列嗎? DelayQueue的實現方式? DelayQueue主要用於什麼場景? ...


問題

(1)DelayQueue是阻塞隊列嗎?

(2)DelayQueue的實現方式?

(3)DelayQueue主要用於什麼場景?

簡介

DelayQueue是java併發包下的延時阻塞隊列,常用於實現定時任務。

繼承體系

qrcode

從繼承體系可以看到,DelayQueue實現了BlockingQueue,所以它是一個阻塞隊列。

另外,DelayQueue還組合了一個叫做Delayed的介面,DelayQueue中存儲的所有元素必須實現Delayed介面。

那麼,Delayed是什麼呢?

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

Delayed是一個繼承自Comparable的介面,並且定義了一個getDelay()方法,用於表示還有多少時間到期,到期了應返回小於等於0的數值。

源碼分析

主要屬性

// 用於控制併發的鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 優先順序隊列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用於標記當前是否有線程在排隊(僅用於取元素時)
private Thread leader = null;
// 條件,用於表示現在是否有可取的元素
private final Condition available = lock.newCondition();

從屬性我們可以知道,延時隊列主要使用優先順序隊列來實現,並輔以重入鎖和條件來控制併發安全。

因為優先順序隊列是無界的,所以這裡只需要一個條件就可以了。

還記得優先順序隊列嗎?點擊鏈接直達【死磕 java集合之PriorityQueue源碼分析

主要構造方法

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

構造方法比較簡單,一個預設構造方法,一個初始化添加集合c中所有元素的構造方法。

入隊

因為DelayQueue是阻塞隊列,且優先順序隊列是無界的,所以入隊不會阻塞不會超時,因此它的四個入隊方法是一樣的。

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

入隊方法比較簡單:

(1)加鎖;

(2)添加元素到優先順序隊列中;

(3)如果添加的元素是堆頂元素,就把leader置為空,並喚醒等待在條件available上的線程;

(4)解鎖;

出隊

因為DelayQueue是阻塞隊列,所以它的出隊有四個不同的方法,有拋出異常的,有阻塞的,有不阻塞的,有超時的。

我們這裡主要分析兩個,poll()和take()方法。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

poll()方法比較簡單:

(1)加鎖;

(2)檢查第一個元素,如果為空或者還沒到期,就返回null;

(3)如果第一個元素到期了就調用優先順序隊列的poll()彈出第一個元素;

(4)解鎖。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 堆頂元素
            E first = q.peek();
            // 如果堆頂元素為空,說明隊列中還沒有元素,直接阻塞等待
            if (first == null)
                available.await();
            else {
                // 堆頂元素的到期時間
                long delay = first.getDelay(NANOSECONDS);
                // 如果小於0說明已到期,直接調用poll()方法彈出堆頂元素
                if (delay <= 0)
                    return q.poll();
                
                // 如果delay大於0 ,則下麵要阻塞了
                
                // 將first置為空方便gc,因為有可能其它元素彈出了這個元素
                // 這裡還持有著引用不會被清理
                first = null; // don't retain ref while waiting
                // 如果前面有其它線程在等待,直接進入等待
                if (leader != null)
                    available.await();
                else {
                    // 如果leader為null,把當前線程賦值給它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待delay時間後自動醒過來
                        // 醒過來後把leader置空並重新進入迴圈判斷堆頂元素是否到期
                        // 這裡即使醒過來後也不一定能獲取到元素
                        // 因為有可能其它線程先一步獲取了鎖並彈出了堆頂元素
                        // 條件鎖的喚醒分成兩步,先從Condition的隊列里出隊
                        // 再入隊到AQS的隊列中,當其它線程調用LockSupport.unpark(t)的時候才會真正喚醒
                        // 關於AQS我們後面會講的^^
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader還是當前線程就把它置為空,讓其它線程有機會獲取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出隊後,如果leader為空且堆頂還有元素,就喚醒下一個等待的線程
        if (leader == null && q.peek() != null)
            // signal()只是把等待的線程放到AQS的隊列裡面,並不是真正的喚醒
            available.signal();
        // 解鎖,這才是真正的喚醒
        lock.unlock();
    }
}

take()方法稍微要複雜一些:

(1)加鎖;

(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;

(3)判斷堆頂元素是否到期,到期了直接調用優先順序隊列的poll()彈出元素;

(4)沒到期,再判斷前面是否有其它線程在等待,有則直接等待;

(5)前面沒有其它線程在等待,則把自己當作第一個線程等待delay時間後喚醒,再嘗試獲取元素;

(6)獲取到元素之後再喚醒下一個等待的線程;

(7)解鎖;

使用方法

說了那麼多,是不是還是不知道怎麼用呢?那怎麼能行,請看下麵的案例:

public class DelayQueueTest {
    public static void main(String[] args) {
        DelayQueue<Message> queue = new DelayQueue<>();

        long now = System.currentTimeMillis();

        // 啟動一個線程從隊列中取元素
        new Thread(()->{
            while (true) {
                try {
                    // 將依次列印1000,2000,5000,7000,8000
                    System.out.println(queue.take().deadline - now);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 添加5個元素到隊列中
        queue.add(new Message(now + 5000));
        queue.add(new Message(now + 8000));
        queue.add(new Message(now + 2000));
        queue.add(new Message(now + 1000));
        queue.add(new Message(now + 7000));
    }
}

class Message implements Delayed {
    long deadline;

    public Message(long deadline) {
        this.deadline = deadline;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return deadline - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return String.valueOf(deadline);
    }
}

是不是很簡單,越早到期的元素越先出隊。

總結

(1)DelayQueue是阻塞隊列;

(2)DelayQueue內部存儲結構使用優先順序隊列;

(3)DelayQueue使用重入鎖和條件來控制併發安全;

(4)DelayQueue常用於定時任務;

彩蛋

java中的線程池實現定時任務是直接用的DelayQueue嗎?

當然不是,ScheduledThreadPoolExecutor中使用的是它自己定義的內部類DelayedWorkQueue,其實裡面的實現邏輯基本都是一樣的,只不過DelayedWorkQueue裡面沒有使用現成的PriorityQueue,而是使用數組又實現了一遍優先順序隊列,本質上沒有什麼區別。


歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.1 你是如何認識新事物的? 一般而言,從過往的見多的事物中,總結->推斷->所屬類別->認知行為。 1.2 類(Class)的概念 類是對一組具有共同特征和行為的對象的抽象描述。 理解 [1]類是專門用於描述現實生活中的事物的。 [2]類描述的事物都具有共同的特征和行為。 [3]類就是我們通常所 ...
  • 本人是做java web開發的,已經工作兩年了。一直都是自己學習學技術,昨天突然靈光一現,覺得自己應該有一個自己的博客。以後我會不定時的在博客上更新一些自己學習掌握的技術。以前都沒有過這樣書面性的給別人講解技術的經驗,可能有什麼寫的不到位的地方,請大家能夠給我指出來說明一下,我會加以改正。大家一起努 ...
  • 思路 首先以那個環為框架,把所有的邊連出來。如果有兩條邊相交,那麼就把其中一條放到環外面去。 ...
  • 一直以來Base64演算法的加密解密都是使用sun.misc包下的BASE64Encoder及BASE64Decoder來進行的。但是這個類是sun公司的內部方法,並沒有在Java API中公開過,不屬於JDK標準庫範疇,但在JDK中包含了該類,可以直接使用。但是在Eclipse和MyEclipse中 ...
  • 題意 給定一張圖,對於每條邊給出一個運算符$(\&,|,\otimes)$和一個值$c(0 \le c \le 1)$。問能否 ...
  • 今日目標 能夠計算二進位和十進位數之間的互轉 能夠使用常見的DOS命令 理解Java語言的跨平臺實現原理 jvm是運行java程式的假想電腦,所有的java程式都運行在它上面。java編寫的軟體可以運行在任何操作系統上,這被稱為java跨平臺特性,是由jvm實現的,java程式運行在jvm上,jv ...
  • 所屬網站分類: 面試經典 > python 作者:外星人入侵 鏈接:http://www.pythonheidong.com/blog/article/67/ 來源:python黑洞網,專註python資源,python教程,python技術 Python支持5種數據類型: 1. Numbers(數 ...
  • 6.1 基本介紹 6.1.1 Scala語言是面向對象的 1) Java時面向對象的編程語言,由於歷史原因,Java中海存在著非面向對象的內容:基本類型,null,靜態方法等 2) Scala語言來自於Java,所以天生就是面向對象的語言,而且Scala時純粹的面相對象的語言,即在Scala中,一切 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...