Java併發系列[6]----Semaphore源碼分析

来源:https://www.cnblogs.com/liuyun1995/archive/2018/02/27/8474026.html
-Advertisement-
Play Games

Semaphore(信號量)是JUC包中比較常用到的一個類,它是AQS共用模式的一個應用,可以允許多個線程同時對共用資源進行操作,並且可以有效的控制併發數,利用它可以很好的實現流量控制。Semaphore提供了一個許可證的概念,可以把這個許可證看作公車車票,只有成功獲取車票的人才能夠上車,並且車 ...


Semaphore(信號量)是JUC包中比較常用到的一個類,它是AQS共用模式的一個應用,可以允許多個線程同時對共用資源進行操作,並且可以有效的控制併發數,利用它可以很好的實現流量控制。Semaphore提供了一個許可證的概念,可以把這個許可證看作公車車票,只有成功獲取車票的人才能夠上車,並且車票是有一定數量的,不可能毫無限制的發下去,這樣就會導致公交車超載。所以當車票發完的時候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那麼他的位置將會空閑出來,因此如果這時其他人想要上車的話就又可以獲得車票了。利用Semaphore可以實現各種池,我們在本篇末尾將會動手寫一個簡易的資料庫連接池。首先我們來看一下Semaphore的構造器。

1 //構造器1
2 public Semaphore(int permits) {
3     sync = new NonfairSync(permits);
4 }
5 
6 //構造器2
7 public Semaphore(int permits, boolean fair) {
8     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
9 }

Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的信號量在獲取許可證時會採用非公平方式獲取,使用構造器2可以通過參數指定獲取許可證的方式(公平or非公平)。Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,預設的是獲取和釋放一個許可證,也可以傳入參數來同時獲取和釋放多個許可證。在本篇中我們只講每次獲取和釋放一個許可證的情況。

1.獲取許可證

 1 //獲取一個許可證(響應中斷)
 2 public void acquire() throws InterruptedException {
 3     sync.acquireSharedInterruptibly(1);
 4 }
 5 
 6 //獲取一個許可證(不響應中斷)
 7 public void acquireUninterruptibly() {
 8     sync.acquireShared(1);
 9 }
10 
11 //嘗試獲取許可證(非公平獲取)
12 public boolean tryAcquire() {
13     return sync.nonfairTryAcquireShared(1) >= 0;
14 }
15 
16 //嘗試獲取許可證(定時獲取)
17 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
18     return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
19 }

上面的API是Semaphore提供的預設獲取許可證操作。每次只獲取一個許可證,這也是現實生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之後可能會阻塞線程,而嘗試獲取則不會。另外還需註意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時我們比較常用到的是acquire方法去獲取許可證。下麵我們就來看看它是怎樣獲取的。可以看到acquire方法裡面直接就是調用sync.acquireSharedInterruptibly(1),這個方法是AQS裡面的方法,我們在講AQS源碼系列文章的時候曾經講過,現在我們再來回顧一下。

 1 //以可中斷模式獲取鎖(共用模式)
 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
 3     //首先判斷線程是否中斷, 如果是則拋出異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //1.嘗試去獲取鎖
 8     if (tryAcquireShared(arg) < 0) {
 9         //2. 如果獲取失敗則進人該方法
10         doAcquireSharedInterruptibly(arg);
11     }
12 }

acquireSharedInterruptibly方法首先就是去調用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS裡面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2     //非公平方式嘗試獲取
 3     final int nonfairTryAcquireShared(int acquires) {
 4         for (;;) {
 5             //獲取可用許可證
 6             int available = getState();
 7             //獲取剩餘許可證
 8             int remaining = available - acquires;
 9             //1.如果remaining小於0則直接返回remaining
10             //2.如果remaining大於0則先更新同步狀態再返回remaining
11             if (remaining < 0 || compareAndSetState(available, remaining)) {
12                 return remaining;
13             }
14         }
15     }
16 }
17 
18 //非公平同步器
19 static final class NonfairSync extends Sync {
20     private static final long serialVersionUID = -2694183684443567898L;
21 
22     NonfairSync(int permits) {
23         super(permits);
24     }
25 
26     //嘗試獲取許可證
27     protected int tryAcquireShared(int acquires) {
28         return nonfairTryAcquireShared(acquires);
29     }
30 }
31 
32 //公平同步器
33 static final class FairSync extends Sync {
34     private static final long serialVersionUID = 2014338818796000944L;
35 
36     FairSync(int permits) {
37         super(permits);
38     }
39 
40     //嘗試獲取許可證
41     protected int tryAcquireShared(int acquires) {
42         for (;;) {
43             //判斷同步隊列前面有沒有人排隊
44             if (hasQueuedPredecessors()) {
45                 //如果有的話就直接返回-1,表示嘗試獲取失敗
46                 return -1;
47             }
48             //獲取可用許可證
49             int available = getState();
50             //獲取剩餘許可證
51             int remaining = available - acquires;
52             //1.如果remaining小於0則直接返回remaining
53             //2.如果remaining大於0則先更新同步狀態再返回remaining
54             if (remaining < 0 || compareAndSetState(available, remaining)) {
55                 return remaining;
56             }
57         }
58     }
59 }

這裡需要註意的是NonfairSync的tryAcquireShared方法直接調用的是nonfairTryAcquireShared方法,這個方法是在父類Sync裡面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去參入的參數,如果結果不小於0的話證明還有可用的許可證,那麼就直接使用CAS操作更新同步狀態的值,最後不管結果是否小於0都會返回該結果值。這裡我們要瞭解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前線程獲取成功但後續線程不能再獲取,正數表示當前線程獲取成功並且後續線程也能夠獲取。我們再來看acquireSharedInterruptibly方法的代碼。

 1 //以可中斷模式獲取鎖(共用模式)
 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
 3     //首先判斷線程是否中斷, 如果是則拋出異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //1.嘗試去獲取鎖
 8     //負數:表示獲取失敗
 9     //零值:表示當前線程獲取成功, 但是後繼線程不能再獲取了
10     //正數:表示當前線程獲取成功, 並且後繼線程同樣可以獲取成功
11     if (tryAcquireShared(arg) < 0) {
12         //2. 如果獲取失敗則進人該方法
13         doAcquireSharedInterruptibly(arg);
14     }
15 }

如果返回的remaining小於0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來就會調用doAcquireSharedInterruptibly方法,這個方法我們在講AQS的時候講過,它會將當前線程包裝成結點放入同步隊列尾部,並且有可能掛起線程。這也是當remaining小於0時線程會排隊阻塞的原因。而如果返回的remaining>=0的話就代表當前線程獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會再去調用doAcquireSharedInterruptibly方法阻塞當前線程了。以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此之前先去調用hasQueuedPredecessors方法判斷同步隊列是否有人在排隊,如果有的話就直接return -1表示獲取失敗,否則才繼續執行下麵和非公平獲取一樣的步驟。

2.釋放許可證

1 //釋放一個許可證
2 public void release() {
3     sync.releaseShared(1);
4 }

調用release方法是釋放一個許可證,它的操作很簡單,就調用了AQS的releaseShared方法,我們來看看這個方法。

 1 //釋放鎖的操作(共用模式)
 2 public final boolean releaseShared(int arg) {
 3     //1.嘗試去釋放鎖
 4     if (tryReleaseShared(arg)) {
 5         //2.如果釋放成功就喚醒其他線程
 6         doReleaseShared();
 7         return true;
 8     }
 9     return false;
10 }

AQS的releaseShared方法首先調用tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync裡面。

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2     ...
 3     //嘗試釋放操作
 4     protected final boolean tryReleaseShared(int releases) {
 5         for (;;) {
 6             //獲取當前同步狀態
 7             int current = getState();
 8             //將當前同步狀態加上傳入的參數
 9             int next = current + releases;
10             //如果相加結果小於當前同步狀態的話就報錯
11             if (next < current) {
12                 throw new Error("Maximum permit count exceeded");
13             }
14             //以CAS方式更新同步狀態的值, 更新成功則返回true, 否則繼續迴圈
15             if (compareAndSetState(current, next)) {
16                 return true;
17             }
18         }
19     }
20     ...
21 }

可以看到tryReleaseShared方法裡面採用for迴圈進行自旋,首先獲取同步狀態,將同步狀態加上傳入的參數,然後以CAS方式更新同步狀態,更新成功就返回true並跳出方法,否則就繼續迴圈直到成功為止,這就是Semaphore釋放許可證的流程。

3.動手寫個連接池

Semaphore代碼並沒有很複雜,常用的操作就是獲取和釋放一個許可證,這些操作的實現邏輯也都比較簡單,但這並不妨礙Semaphore的廣泛應用。下麵我們就來利用Semaphore實現一個簡單的資料庫連接池,通過這個例子希望讀者們能更加深入的掌握Semaphore的運用。

 1 public class ConnectPool {
 2     
 3     //連接池大小
 4     private int size;
 5     //資料庫連接集合
 6     private Connect[] connects;
 7     //連接狀態標誌
 8     private boolean[] connectFlag;
 9     //剩餘可用連接數
10     private volatile int available;
11     //信號量
12     private Semaphore semaphore;
13     
14     //構造器
15     public ConnectPool(int size) {  
16         this.size = size;
17         this.available = size;
18         semaphore = new Semaphore(size, true);
19         connects = new Connect[size];
20         connectFlag = new boolean[size];
21         initConnects();
22     }
23     
24     //初始化連接
25     private void initConnects() {
26         //生成指定數量的資料庫連接
27         for(int i = 0; i < this.size; i++) {
28             connects[i] = new Connect();
29         }
30     }
31     
32     //獲取資料庫連接
33     private synchronized Connect getConnect(){  
34         for(int i = 0; i < connectFlag.length; i++) {
35             //遍歷集合找到未使用的連接
36             if(!connectFlag[i]) {
37                 //將連接設置為使用中
38                 connectFlag[i] = true;
39                 //可用連接數減1
40                 available--;
41                 System.out.println("【"+Thread.currentThread().getName()+"】以獲取連接      剩餘連接數:" + available);
42                 //返回連接引用
43                 return connects[i];
44             }
45         }
46         return null;
47     }
48     
49     //獲取一個連接
50     public Connect openConnect() throws InterruptedException {
51         //獲取許可證
52         semaphore.acquire();
53         //獲取資料庫連接
54         return getConnect();
55     }
56     
57     //釋放一個連接
58     public synchronized void release(Connect connect) {  
59         for(int i = 0; i < this.size; i++) {
60             if(connect == connects[i]){
61                 //將連接設置為未使用
62                 connectFlag[i] = false;
63                 //可用連接數加1
64                 available++;
65                 System.out.println("【"+Thread.currentThread().getName()+"】以釋放連接      剩餘連接數:" + available);
66                 //釋放許可證
67                 semaphore.release();
68             }
69         }
70     }
71     
72     //剩餘可用連接數
73     public int available() {
74         return available;
75     }
76     
77 }

測試代碼:

 1 public class TestThread extends Thread {
 2     
 3     private static ConnectPool pool = new ConnectPool(3);
 4     
 5     @Override
 6     public void run() {
 7         try {
 8             Connect connect = pool.openConnect();
 9             Thread.sleep(100);  //休息一下
10             pool.release(connect);
11         } catch (InterruptedException e) {
12             e.printStackTrace();
13         }
14     }
15     
16     public static void main(String[] args) {
17         for(int i = 0; i < 10; i++) {
18             new TestThread().start();
19         }
20     }
21 
22 }

測試結果:

我們使用一個數組來存放資料庫連接的引用,在初始化連接池的時候會調用initConnects方法創建指定數量的資料庫連接,並將它們的引用存放到數組中,此外還有一個相同大小的數組來記錄連接是否可用。每當外部線程請求獲取一個連接時,首先調用semaphore.acquire()方法獲取一個許可證,然後將連接狀態設置為使用中,最後返回該連接的引用。許可證的數量由構造時傳入的參數決定,每調用一次semaphore.acquire()方法許可證數量減1,當數量減為0時說明已經沒有連接可以使用了,這時如果其他線程再來獲取就會被阻塞。每當線程釋放一個連接的時候會調用semaphore.release()將許可證釋放,此時許可證的總量又會增加,代表可用的連接數增加了,那麼之前被阻塞的線程將會醒來繼續獲取連接,這時再次獲取就能夠成功獲取連接了。測試示例中初始化了一個3個連接的連接池,我們從測試結果中可以看到,每當線程獲取一個連接剩餘的連接數將會減1,等到減為0時其他線程就不能再獲取了,此時必須等待一個線程將連接釋放之後才能繼續獲取。可以看到剩餘連接數總是在0到3之間變動,說明我們這次的測試是成功的。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • css三角形 ...
  • 前言 ES6新增了數據類型Set,它是一種類似數組的數據結構。但它和數組的不同之處在於它的成員都是唯一的,也就是說可以用來去除數組重覆成員。 Set本身是一個構造函數用來生成Set數據結構。 const s=new Set(); 使用add()添加成員。也可以在構造函數中傳入數組作為參數 const ...
  • 模式定義 定義一系列演算法,分別封裝起來,讓它們之間可以呼死去那個替換,此模式讓演算法變化,不會影響到使用演算法的客戶 類圖定義 示例 示例來自於Head First上的鴨子例子,一個鴨子的系統,系統中會出現不同的鴨子,一邊游泳一邊叫。綠頭鴨子會飛,會游泳,正常呱呱叫,橡皮鴨子不會飛不會游泳吱吱叫。後期可 ...
  • 工作流模塊 1.模型管理 :web線上流程設計器、預覽流程xml、導出xml、部署流程 2.流程管理 :導入導出流程資源文件、查看流程圖、根據流程實例反射出流程模型、激活掛起 3.運行中流程:查看流程信息、當前任務節點、當前流程圖、作廢暫停流程、指派待辦人 4.歷史的流程:查看流程信息、流程用時、流 ...
  • 說到面向對象這個破玩意,曾經一度我都處於很懵逼的狀態,那麼面向對象究竟是什麼呢?其實說白了,所謂面向對象,就是基於類這個概念,來實現封裝、繼承和多態的一種編程思想罷了。今天我們就來說一下這其中繼承的問題。 好,先不直接上代碼,而是反手來一波文字說明,捋一捋思路。 曾經一段時間因為javascript ...
  • 各行各業,各個領域,各個渠道,都需要有一系列的完整的風險控制,以保證事情向好的方向發展,而免受不可預估的經濟和財產損失而綽手不及。這時候一套完備的風控系統應運而生,以解決實際在生產業務中的各種難題。作為事物的主體,可以採取各種措施和方法,消滅或減少風險事件發生的各種可能性,或減少風險事件發生時造成的... ...
  • Learning to Rank,即排序學習,簡稱為 L2R,它是構建排序模型的機器學習方法,在信息檢索、自然語言處理、數據挖掘等場景中具有重要的作用。其達到的效果是:給定一組文檔,對任意查詢請求給出反映文檔相關性的文檔排序。本文簡單介紹一下 L2R 的基本演算法及評價指標。 背景 隨著互聯網的快速發 ...
  • Description Siruseri 城中的道路都是單向的。不同的道路由路口連接。按照法律的規定, 在每個路口都設立了一個 Siruser i 銀行的 ATM 取款機。令人奇怪的是,Siruseri 的酒吧也都設在路口,雖然並不是每個路口都設有酒吧。Bandit ji 計劃實施 Siruseri ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...