電腦程式的思維邏輯 (67) - 線程的基本協作機制 (上)

来源:http://www.cnblogs.com/swiftma/archive/2017/02/21/6421803.html
-Advertisement-
Play Games

本節和下節介紹線程的基本協作機制wait/notify,本節介紹協作的場景,wait/notify的基本用法和原理,以及如何實現生產者/消費者模式 ... ...


上節介紹了多線程之間競爭訪問同一個資源的問題及解決方案synchronized,我們提到,多線程之間除了競爭,還經常需要相互協作,本節就來介紹Java中多線程協作的基本機制wait/notify。

都有哪些場景需要協作?wait/notify是什麼?如何使用?實現原理是什麼?協作的核心是什麼?如何實現各種典型的協作場景?由於內容較多,我們分為上下兩節來介紹。

我們先來看看都有哪些協作的場景。

協作的場景

多線程之間需要協作的場景有很多,比如說:

  • 生產者/消費者協作模式:這是一種常見的協作模式,生產者線程和消費者線程通過共用隊列進行協作,生產者將數據或任務放到隊列上,而消費者從隊列上取數據或任務,如果隊列長度有限,在隊列滿的時候,生產者需要等待,而在隊列為空的時候,消費者需要等待。
  • 同時開始:類似運動員比賽,在聽到比賽開始槍響後同時開始,在一些程式,尤其是模擬模擬程式中,要求多個線程能同時開始。
  • 等待結束:主從協作模式也是一種常見的協作模式,主線程將任務分解為若幹個子任務,為每個子任務創建一個線程,主線程在繼續執行其他任務之前需要等待每個子任務執行完畢。
  • 非同步結果:在主從協作模式中,主線程手工創建子線程的寫法往往比較麻煩,一種常見的模式是將子線程的管理封裝為非同步調用,非同步調用馬上返回,但返回的不是最終的結果,而是一個一般稱為Promise或Future的對象,通過它可以在隨後獲得最終的結果。
  • 集合點:類似於學校或公司組團旅游,在旅游過程中有若幹集合點,比如出發集合點,每個人從不同地方來到集合點,所有人到齊後進行下一項活動,在一些程式,比如並行迭代計算中,每個線程負責一部分計算,然後在集合點等待其他線程完成,所有線程到齊後,交換數據和計算結果,再進行下一次迭代。

我們會探討如何實現這些協作場景,在此之前,我們先來瞭解協作的基本方法wait/notify。

wait/notify

我們知道,Java的根父類是Object,Java在Object類而非Thread類中,定義了一些線程協作的基本方法,使得每個對象都可以調用這些方法,這些方法有兩類,一類是wait,另一類是notify。

主要有兩個wait方法:

public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;

一個帶時間參數,單位是毫秒,表示最多等待這麼長時間,參數為0表示無限期等待。一個不帶時間參數,表示無限期等待,實際就是調用wait(0)。在等待期間都可以被中斷,如果被中斷,會拋出InterruptedException,關於中斷及中斷處理,我們在下節介紹,本節暫時忽略該異常。

wait實際上做了什麼呢?它在等待什麼?上節我們說過,每個對象都有一把鎖和等待隊列,一個線程在進入synchronized代碼塊時,會嘗試獲取鎖,獲取不到的話會把當前線程加入等待隊列中,其實,除了用於鎖的等待隊列,每個對象還有另一個等待隊列,表示條件隊列,該隊列用於線程間的協作。調用wait就會把當前線程放到條件隊列上並阻塞,表示當前線程執行不下去了,它需要等待一個條件,這個條件它自己改變不了,需要其他線程改變。當其他線程改變了條件後,應該調用Object的notify方法:

public final native void notify();
public final native void notifyAll();

notify做的事情就是從條件隊列中選一個線程,將其從隊列中移除並喚醒,notifyAll和notify的區別是,它會移除條件隊列中所有的線程並全部喚醒。

我們來看個簡單的例子,一個線程啟動後,在執行一項操作前,它需要等待主線程給它指令,收到指令後才執行,代碼如下:

public class WaitThread extends Thread {
    private volatile boolean fire = false;

    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!fire) {
                    wait();
                }
            }
            System.out.println("fired");
        } catch (InterruptedException e) {
        }
    }

    public synchronized void fire() {
        this.fire = true;
        notify();
    }

    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

示例代碼中有兩個線程,一個是主線程,一個是WaitThread,協作的條件變數是fire,WaitThread等待該變數變為true,在不為true的時候調用wait,主線程設置該變數並調用notify。

兩個線程都要訪問協作的變數fire,容易出現競態條件,所以相關代碼都需要被synchronized保護。實際上,wait/notify方法只能在synchronized代碼塊內被調用,如果調用wait/notify方法時,當前線程沒有持有對象鎖,會拋出異常java.lang.IllegalMonitorStateException。

你可能會有疑問,如果wait必須被synchronzied保護,那一個線程在wait時,另一個線程怎麼可能調用同樣被synchronzied保護的notify方法呢?它不需要等待鎖嗎?我們需要進一步理解wait的內部過程,雖然是在synchronzied方法內,但調用wait時,線程會釋放對象鎖,wait的具體過程是:

  1. 把當前線程放入條件等待隊列,釋放對象鎖,阻塞等待,線程狀態變為WAITING或TIMED_WAITING
  2. 等待時間到或被其他線程調用notify/notifyAll從條件隊列中移除,這時,要重新競爭對象鎖
    • 如果能夠獲得鎖,線程狀態變為RUNNABLE,並從wait調用中返回
    • 否則,該線程加入對象鎖等待隊列,線程狀態變為BLOCKED,只有在獲得鎖後才會從wait調用中返回

線程從wait調用中返回後,不代表其等待的條件就一定成立了,它需要重新檢查其等待的條件,一般的調用模式是:

synchronized (obj) {
    while (條件不成立)
        obj.wait();
    ... // 執行條件滿足後的操作
}

比如,上例中的代碼是:

synchronized (this) {
    while (!fire) {
        wait();
    }
}

調用notify會把在條件隊列中等待的線程喚醒並從隊列中移除,但它不會釋放對象鎖,也就是說,只有在包含notify的synchronzied代碼塊執行完後,等待的線程才會從wait調用中返回。

簡單總結一下,wait/notify方法看上去很簡單,但往往難以理解wait等的到底是什麼,而notify通知的又是什麼,我們需要知道,它們與一個共用的條件變數有關,這個條件變數是程式自己維護的,當條件不成立時,線程調用wait進入條件等待隊列,另一個線程修改了條件變數後調用notify,調用wait的線程喚醒後需要重新檢查條件變數。從多線程的角度看,它們圍繞共用變數進行協作,從調用wait的線程角度看,它阻塞等待一個條件的成立。我們在設計多線程協作時,需要想清楚協作的共用變數和條件是什麼,這是協作的核心。接下來,我們通過一些場景來進一步理解wait/notify的應用,本節只介紹生產者/消費者模式,下節介紹更多模式。

生產者/消費者模式

在生產者/消費者模式中,協作的共用變數是隊列,生產者往隊列上放數據,如果滿了就wait,而消費者從隊列上取數據,如果隊列為空也wait。我們將隊列作為單獨的類進行設計,代碼如下:

static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;

    public MyBlockingQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<>(limit);
    }

    public synchronized void put(E e) throws InterruptedException {
        while (queue.size() == limit) {
            wait();
        }
        queue.add(e);
        notifyAll();
    }

    public synchronized E take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        E e = queue.poll();
        notifyAll();
        return e;
    }
}

MyBlockingQueue是一個長度有限的隊列,長度通過構造方法的參數進行傳遞,有兩個方法put和take。put是給生產者使用的,往隊列上放數據,滿了就wait,放完之後調用notifyAll,通知可能的消費者。take是給消費者使用的,從隊列中取數據,如果為空就wait,取完之後調用notifyAll,通知可能的生產者。

我們看到,put和take都調用了wait,但它們的目的是不同的,或者說,它們等待的條件是不一樣的,put等待的是隊列不為滿,而take等待的是隊列不為空,但它們都會加入相同的條件等待隊列。由於條件不同但又使用相同的等待隊列,所以要調用notifyAll而不能調用notify,因為notify只能喚醒一個線程,如果喚醒的是同類線程就起不到協調的作用。

只能有一個條件等待隊列,這是Java wait/notify機制的局限性,這使得對於等待條件的分析變得複雜,後續章節我們會介紹顯式的鎖和條件,它可以解決該問題。

一個簡單的生產者代碼如下所示:

static class Producer extends Thread {
    MyBlockingQueue<String> queue;

    public Producer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int num = 0;
        try {
            while (true) {
                String task = String.valueOf(num);
                queue.put(task);
                System.out.println("produce task " + task);
                num++;
                Thread.sleep((int) (Math.random() * 100));
            }
        } catch (InterruptedException e) {
        }
    }
}

Producer向共用隊列中插入模擬的任務數據。一個簡單的示例消費者代碼如下所示:

static class Consumer extends Thread {
    MyBlockingQueue<String> queue;

    public Consumer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String task = queue.take();
                System.out.println("handle task " + task);
                Thread.sleep((int)(Math.random()*100));
            }
        } catch (InterruptedException e) {
        }
    }
}

主程式的示例代碼如下所示:

public static void main(String[] args) {
    MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
    new Producer(queue).start();
    new Consumer(queue).start();
}

運行該程式,會看到生產者和消費者線程的輸出交替出現。

我們實現的MyBlockingQueue主要用於演示,Java提供了專門的阻塞隊列實現,包括:

  • 介面BlockingQueue和BlockingDeque
  • 基於數組的實現類ArrayBlockingQueue
  • 基於鏈表的實現類LinkedBlockingQueue和LinkedBlockingDeque
  • 基於堆的實現類PriorityBlockingQueue

我們會在後續章節介紹這些類,在實際系統中,應該考慮使用這些類。

小結

本節介紹了Java中線程間協作的基本機制wait/notify,協作關鍵要想清楚協作的共用變數和條件是什麼,為進一步理解,本節針對生產者/消費者模式演示了wait/notify的用法。

下一節,我們來繼續探討其他協作模式。

(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)

----------------

未完待續,查看最新文章,敬請關註微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及電腦技術的本質。用心原創,保留所有版權。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、Parallel的使用 在Parallel下麵有三個常用的方法Invoke、For和ForEach。 1、Parallel.Invoke 執行Parallel.ParallelInvokeMethod()的結果: 可以看出正常調用Run1()和Run2()所花費的時間在5秒左右,而用Parall ...
  • 在ASP.NET MVC框架中,將視圖中的數據傳遞到控制器中,主要通過發送表單實現的。具體使用中,主要使用以下三種方法。 1.通過Request.Form讀取表單數據 表單代碼: 使用Request.Form提取表單數據: 2.通過FormCollection讀取表單數據 3、直接讀取表單數據對象 ...
  • 2017年我們將聚集在打磨我們的控制項,使wijmo更易於使用,增加一些重要的新控制項 ...
  • 今天更新了一個網站,新增了一個頁面,調用WebService,在測試環境好好的,部署到正式環境後就莫名報錯: 伺服器提交了協議衝突. Section=ResponseStatusLine 網上查了好多解決方案,有說加這個配置節的: 然而並沒有什麼卵用啊! 一個偶然的發現,讓我找到了真正的原因。 我在 ...
  • 今天一個兼職結束了,又要開始尋找新的兼職公司了 ,為了貼補家用啊,為了給兒子更好的生活加油! 抒情完畢進入正題,本篇文章要解決的問題是其實在開發微信支付,微信公眾號等回調地址必須是外網可訪問的80埠地址,這就導致很多開發上的不便,網上應該有很多教程做類似的了我這篇重覆造輪子一是記錄一下以備自己將來 ...
  • 數據驗證控制項 --之心 在ASP中進行表單數據驗證時,通常開發者必須自己編寫一套驗證的規則,然後自己將這些代碼拷貝到ASP代碼中對錶單進行驗證。這樣進行驗證的方式實在不太方便,幸運的是,ASP.NET解決了這個問題,這就是數據驗證Web控制項。 數據驗證控制項是ASP.NET中專門用來驗證表單用戶輸入的 ...
  • 20170220問題解析請點擊今日問題下方的“【Java每日一題】20170221”查看(問題解析在公眾號首發,公眾號ID:weknow619) 今日問題: 請問該程式運行結果是什麼?(點擊以下“【Java每日一題】20170221”查看20170220問題解析) 題目原發佈於公眾號、簡書:【Jav ...
  • 1. 一個錯誤釋放記憶體的例子 下麵的場景會有什麼錯? 一切看上去都是有序的。new匹配了一個delete。但有一些地方確實是錯了。程式的行為是未定義的。至少來說,stringArray指向的100個string對象中的99個看上去都不能被正確釋放,因為他們的析構函數可能永遠不會被調用。 2. 使用n ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...