【JAVA併發】同步工具類

来源:http://www.cnblogs.com/chenpi/archive/2016/04/06/5358579.html
-Advertisement-
Play Games

同步工具類主要包括閉鎖(如CountDownLatch),柵欄(如CyclicBarrier),信號量(如Semaphore)和阻塞隊列(如LinkedBlockingQueue)等; 使用同步工具類可以協調線程的控制流; 同步工具類封裝了一些狀態,這些狀態決定線程是繼續執行還是等待,此外同步工具類 ...


同步工具類主要包括閉鎖(如CountDownLatch),柵欄(如CyclicBarrier),信號量(如Semaphore)和阻塞隊列(如LinkedBlockingQueue)等;

使用同步工具類可以協調線程的控制流;

同步工具類封裝了一些狀態,這些狀態決定線程是繼續執行還是等待,此外同步工具類還提供了修改狀態的方法;

下麵將簡單介紹以上同步工具類;

閉鎖

可以讓一個線程等待一組事件發生後(不一定要線程結束)繼續執行;

以CountDownLatch為例,內部包含一個計數器,一開始初始化為一個整數(事件個數),發生一個事件後,調用countDown方法,計數器減1,await用於等待計數器為0後繼續執行當前線程;

舉個例子如下,main線程等待其它子線程的事件發生後繼續執行main線程:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class TaskTest implements Runnable {

    private CountDownLatch latch;
    private int sleepTime;

    /**
     * 
     */
    public TaskTest(int sleepTime, CountDownLatch latch) {
        this.sleepTime = sleepTime;
        this.latch = latch;
    }

    /**
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        try {
            CountDownLatchTest.print(" is running。");
            TimeUnit.MILLISECONDS.sleep(sleepTime);
            CountDownLatchTest.print(" finished。");
            //計數器減減
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

public class CountDownLatchTest {
    public static void main(String[] args) {
        int count = 10;
        final CountDownLatch latch = new CountDownLatch(count);
        ExecutorService es = Executors.newFixedThreadPool(count);
        for (int i = 0; i < count; i++) {
            es.execute(new TaskTest((i + 1) * 1000, latch));
        }

        try {
            CountDownLatchTest.print(" waiting...");
            //主線程等待其它事件發生
            latch.await();
            //其它事件已發生,繼續執行主線程
            CountDownLatchTest.print(" continue。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            es.shutdown();
        }
    }
    
    public static void print(String str){
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
    }
}

結果列印如下:

[09:41:43]pool-1-thread-1 is running。
[09:41:43]pool-1-thread-6 is running。
[09:41:43]main waiting...
[09:41:43]pool-1-thread-10 is running。
[09:41:43]pool-1-thread-4 is running。
[09:41:43]pool-1-thread-5 is running。
[09:41:43]pool-1-thread-2 is running。
[09:41:43]pool-1-thread-3 is running。
[09:41:43]pool-1-thread-7 is running。
[09:41:43]pool-1-thread-8 is running。
[09:41:43]pool-1-thread-9 is running。
[09:41:44]pool-1-thread-1 finished。
[09:41:45]pool-1-thread-2 finished。
[09:41:46]pool-1-thread-3 finished。
[09:41:47]pool-1-thread-4 finished。
[09:41:48]pool-1-thread-5 finished。
[09:41:49]pool-1-thread-6 finished。
[09:41:50]pool-1-thread-7 finished。
[09:41:51]pool-1-thread-8 finished。
[09:41:52]pool-1-thread-9 finished。
[09:41:53]pool-1-thread-10 finished。
[09:41:53]main continue。。。

 此外,FutureTask也可用作閉鎖,其get方法會等待任務完成後返回結果,否則一直阻塞直到任務完成;

信號量

控制同時執行某個指定操作的數量,常用於實現資源池,如資料庫連接池,線程池...
以Semaphore為例,其內部維護一組資源,可以通過構造函數指定數目,其它線程在執行的時候,可以通過acquire方法獲取資源,有的話,繼續執行(使用結束後釋放資源),沒有資源的話將阻塞直到有其它線程調用release方法釋放資源;

舉個例子,如下代碼,十個線程競爭三個資源,一開始有三個線程可以直接運行,剩下的七個線程只能阻塞等到其它線程使用資源完畢才能執行;

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    
    public static void print(String str){
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
    }
    
    public static void main(String[] args) {
        // 線程數目
        int threadCount = 10;
        // 資源數目
        Semaphore semaphore = new Semaphore(3);
        
        ExecutorService es = Executors.newFixedThreadPool(threadCount);

        // 啟動若幹線程
        for (int i = 0; i < threadCount; i++)
            es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
    }
}

class ConsumeResourceTask implements Runnable {
    private Semaphore semaphore;
    private int sleepTime;

    /**
         * 
         */
    public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
        this.sleepTime = sleepTime;
        this.semaphore = semaphore;
    }

    public void run() {
        try {
            //獲取資源
            semaphore.acquire();
            SemaphoreTest.print(" 占用一個資源...");
            TimeUnit.MILLISECONDS.sleep(sleepTime);
            SemaphoreTest.print(" 資源使用結束,釋放資源");
            //釋放資源
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
[10:30:11]pool-1-thread-1 占用一個資源...
[10:30:11]pool-1-thread-2 占用一個資源...
[10:30:11]pool-1-thread-3 占用一個資源...
[10:30:12]pool-1-thread-1 資源使用結束,釋放資源
[10:30:12]pool-1-thread-4 占用一個資源...
[10:30:13]pool-1-thread-2 資源使用結束,釋放資源
[10:30:13]pool-1-thread-5 占用一個資源...
[10:30:14]pool-1-thread-3 資源使用結束,釋放資源
[10:30:14]pool-1-thread-8 占用一個資源...
[10:30:16]pool-1-thread-4 資源使用結束,釋放資源
[10:30:16]pool-1-thread-6 占用一個資源...
[10:30:18]pool-1-thread-5 資源使用結束,釋放資源
[10:30:18]pool-1-thread-9 占用一個資源...
[10:30:22]pool-1-thread-8 資源使用結束,釋放資源
[10:30:22]pool-1-thread-7 占用一個資源...
[10:30:22]pool-1-thread-6 資源使用結束,釋放資源
[10:30:22]pool-1-thread-10 占用一個資源...
[10:30:27]pool-1-thread-9 資源使用結束,釋放資源
[10:30:29]pool-1-thread-7 資源使用結束,釋放資源
[10:30:32]pool-1-thread-10 資源使用結束,釋放資源

柵欄

柵欄用於等待其它線程,且會阻塞自己當前線程;

所有線程必須同時到達柵欄位置後,才能繼續執行;

舉個例子如下:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class CyclicBarrierTaskTest implements Runnable {
    private CyclicBarrier cyclicBarrier;

    private int timeout;

    public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
        this.cyclicBarrier = cyclicBarrier;
        this.timeout = timeout;
    }

    @Override
    public void run() {
        TestCyclicBarrier.print(" 正在running...");
        try {
            TimeUnit.MILLISECONDS.sleep(timeout);
            TestCyclicBarrier.print(" 到達柵欄處,等待其它線程到達");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

        TestCyclicBarrier.print(" 所有線程到達柵欄處,繼續執行各自線程任務...");
    }
}

public class TestCyclicBarrier {

    public static void print(String str) {
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]"
                + Thread.currentThread().getName() + str);
    }

    public static void main(String[] args) {
        int count = 5;
        
        ExecutorService es = Executors.newFixedThreadPool(count);

        CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {

            @Override
            public void run() {
                TestCyclicBarrier.print(" 所有線程到達柵欄處,可以在此做一些處理...");
            }
        });
        for (int i = 0; i < count; i++)
            es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
    }

}
[11:07:00]pool-1-thread-2 正在running...
[11:07:00]pool-1-thread-1 正在running...
[11:07:00]pool-1-thread-5 正在running...
[11:07:00]pool-1-thread-3 正在running...
[11:07:00]pool-1-thread-4 正在running...
[11:07:01]pool-1-thread-1 到達柵欄處,等待其它線程到達
[11:07:02]pool-1-thread-2 到達柵欄處,等待其它線程到達
[11:07:03]pool-1-thread-3 到達柵欄處,等待其它線程到達
[11:07:04]pool-1-thread-4 到達柵欄處,等待其它線程到達
[11:07:05]pool-1-thread-5 到達柵欄處,等待其它線程到達
[11:07:05]pool-1-thread-5 所有線程到達柵欄處,可以在此做一些處理...
[11:07:05]pool-1-thread-1 所有線程到達柵欄處,繼續執行各自線程任務...
[11:07:05]pool-1-thread-2 所有線程到達柵欄處,繼續執行各自線程任務...
[11:07:05]pool-1-thread-5 所有線程到達柵欄處,繼續執行各自線程任務...
[11:07:05]pool-1-thread-3 所有線程到達柵欄處,繼續執行各自線程任務...
[11:07:05]pool-1-thread-4 所有線程到達柵欄處,繼續執行各自線程任務...

阻塞隊列

阻塞隊列提供了可阻塞的入隊和出對操作,如果隊列滿了,入隊操作將阻塞直到有空間可用,如果隊列空了,出隊操作將阻塞直到有元素可用;

隊列可以為有界和無界隊列,無界隊列不會滿,因此入隊操作將不會阻塞;

下麵將使用阻塞隊列LinkedBlockingQueue舉個生產者-消費者例子,生產者每隔1秒生產1個產品,然後有6個消費者在消費產品,可以發現,每隔1秒,只有一個消費者能夠獲取到產品消費,其它線程只能等待...

如下代碼:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

//生產者
public class Producer implements Runnable {
    private final BlockingQueue<String> fileQueue;

    public Producer(BlockingQueue<String> queue) {
        this.fileQueue = queue;

    }

    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(1000);
                String produce = this.produce();
                System.out.println(Thread.currentThread() + "生產:" + produce);
                fileQueue.put(produce);
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String produce() {
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        return dfdate.format(new Date());
    }

    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        for (int i = 0; i < 1; i++) {
            new Thread(new Producer(queue)).start();
        }
        for (int i = 0; i < 6; i++) {
            new Thread(new Consumer(queue)).start();
        }
    }
}

// 消費者
class Consumer implements Runnable {
    private final BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(1000);
                System.out.println(Thread.currentThread() + "prepare 消費");
                System.out.println(Thread.currentThread() + "starting:"
                        + queue.take());
                System.out.println(Thread.currentThread() + "end 消費");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
Thread[Thread-1,5,main]prepare 消費
Thread[Thread-3,5,main]prepare 消費
Thread[Thread-4,5,main]prepare 消費
Thread[Thread-2,5,main]prepare 消費
Thread[Thread-6,5,main]prepare 消費
Thread[Thread-5,5,main]prepare 消費
Thread[Thread-0,5,main]生產:11:36:36
Thread[Thread-1,5,main]starting:11:36:36
Thread[Thread-1,5,main]end 消費
Thread[Thread-1,5,main]prepare 消費
Thread[Thread-0,5,main]生產:11:36:37
Thread[Thread-4,5,main]starting:11:36:37
Thread[Thread-4,5,main]end 消費
Thread[Thread-4,5,main]prepare 消費
Thread[Thread-0,5,main]生產:11:36:38
Thread[Thread-3,5,main]starting:11:36:38
Thread[Thread-3,5,main]end 消費
...

 參考資料:java併發編程實戰


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 外鍵 創建: 方式1:在創建表時使用foreign key(欄位名) references 表名(主鍵); 方式2:通過修改表結構add foreign key(欄位名) references 表名(主鍵); 刪除: alter table 表名 drop foreign key 外鍵名; 如果查看 ...
  • 歷屆試題 蘭頓螞蟻 時間限制:1.0s 記憶體限制:256.0MB 時間限制:1.0s 記憶體限制:256.0MB 問題描述 蘭頓螞蟻,是於1986年,由克裡斯·蘭頓提出來的,屬於細胞自動機的一種。 平面上的正方形格子被填上黑色或白色。在其中一格正方形內有一隻“螞蟻”。 螞蟻的頭部朝向為:上下左右其中一 ...
  • HashMap 的性能因數 1. 容量:表示桶位的數量。 2. 初始容量: 表在創建是所擁有的桶位數。 如果你知道將要在HashMap存儲多少項,創建一個初始容量合適的HashMap將可以避免自動再散列的開銷 /** * The default initial capacity - MUST be ... ...
  • 說是解決,其實不是很完美的解決的,寫出來只是想記錄一下這個問題或者看一下有沒有哪位仁兄會的,能否知道一二。 下麵說說出現問題: 問題是這樣的,當我查詢一個一對多的實體的時候,工具直接就爆了,差不多我就猜到是哪裡死迴圈了,最後等了好久,查看原因,果然是堆溢出,再然後是jsckson的錯誤。那麼必然是序 ...
  • 最近加入了python部落,感覺裡面的刷題寶很有意思,玩了一下,知道了許多以前並不清楚的內置函數,然後感覺到快要記不住了,所以開始陳列一下 1.divmod(a,b):取a除以b的商和餘數,功效等價於(a//b, a%b); 2.dir():參數為函數名,類名。它會告訴我們對應函數包含有什麼參數 3 ...
  • 前兩天學習了兩種java操作excel的方法,對於網站需要創建、修改、導出excel文件,可以使用這兩種方法,第一種方法是使用JExcel,第二種方法是使用poi中的HSSF,前一種方法比較簡單。這裡做個總結。 1.JExcel,需要的jar包jxl.jar 只讀文件 Workbook wb=Wor ...
  • Spring 框架提供了構建 Web 應用程式的全功能 MVC 模塊。Spring MVC屬於SpringFrameWork的後續產品,已經融合在Spring Web Flow裡面。Spring MVC的易用性、功能強大等優點已經被越來越多的企業所接受,也成為一個使用廣泛的mvc框架。因此,尚學堂對 ...
  • 引言 本文是圍繞Linux udp api 構建一個簡易的多人聊天室.重點看思路,幫助我們加深 對udp開發中一些api瞭解.相對而言udp socket開發相比tcp socket開發註意的細節要少很多. 但是水也很深. 本文就當是一個demo整合幫助開發者回顧和繼續瞭解 linux udp開發的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...