併發工具類Phaser

来源:https://www.cnblogs.com/jtea/archive/2023/08/11/17622139.html
-Advertisement-
Play Games

# 前言 在面試這一篇我們介紹過[CountDownLatch和CyclicBarrier](https://github.com/jmilktea/jtea/blob/master/%E9%9D%A2%E8%AF%95/CountDownLatch%E5%92%8CCyclicBarrier.md ...


前言

在面試這一篇我們介紹過CountDownLatch和CyclicBarrier,它們都是jdk1.5提供的多線程併發控制類,內部都是用AQS這個同步框架實現。
在我們的實際項目中,有很多場景是需要從資料庫查詢一批數據,多線池執行某些操作,並且要統計結果,我們對這個過程做了一些封裝,由於要統計結果,所以需要等所有任務都處理完成,我們用到了CountDownLatch實現同步。偽代碼如下:

        ExecuteInstance ei = ExecuteInstance.build(myExecutor); //線程池
		
        //迴圈
        LoopShutdown.build("myTask").loop(() -> {

            //不斷從數據獲取數據
            List<Task> list = getFromDb();
            
            //設置countdownlatch
  	    ei.setCountDownSize(list.size());

	    list.forEach(item -> ei.execute(() -> {
		//提交到線程池執行,並且統計
	    }));
            
            //等待這一批做完
	    ei.await();
		
	});

        //內部使用了CountDownLatch await()
	return ei.awaitResult();

代碼很簡單,容易理解。不過後來有同學提到每次都要setCountDownSize() + await() 這套組合太麻煩,能不能省略這兩步呢。另外也不夠靈活,有些場景不能提前知道要處理的數據總數,例如從迭代器遍曆數據,Iterator介面並沒有size方法可以獲取到總數。

那怎麼實現這個功能呢?就是本篇要介紹的Phaser。

Phaser原理

Phaser類是jdk7提供的,可重用的,同步的,在功能上和CountDownLatch,CyclicBarrier類似,但更加靈活的類。
"phaser" google翻譯一下是:"移相器"的意思,完全不知道是什麼~,不過"phase"是階段的意思,還是能從名字瞭解到一些信息。

Phaser運行機制:

  • Registration(註冊)
    跟其他barrier不同,在phaser上註冊的parties會隨著時間的變化而變化。任務可以隨時註冊(使用方法register,bulkRegister註冊,或者由構造器確定初始parties),並且在任何抵達點可以隨意地撤銷註冊(方法arriveAndDeregister)。就像大多數基本的同步結構一樣,註冊和撤銷隻影響內部計數;不會創建更深的內部記錄,所以任務不能查詢他們是否已經註冊。(不過,可以通過繼承來實現類似的記錄)
    可以動態的註冊是它的特點之一,我們知道CountDownLatch之類的在開始就需要指定一個計數,並且不能更改,而Phaser可以開始指定,也可以運行時更改。

  • Synchronization(同步機制)
    和CyclicBarrier一樣,Phaser也可以重覆await。方法arriveAndAwaitAdvance的效果類似CyclicBarrier.await。phaser的每一代都有一個相關的phase number,初始值為0,當所有註冊的任務都到達phaser時phase+1,到達最大值(Integer.MAX_VALUE)之後清零。使用phase number可以獨立控制到達phaser和等待其他線程的動作,通過下麵兩種類型的方法:

    Arrival(到達機制) arrive和arriveAndDeregister方法記錄到達狀態。這些方法不會阻塞,但是會返回一個相關的arrival phase number;也就是說,phase number用來確定到達狀態。當所有任務都到達給定phase時,可以執行一個可選的函數,這個函數通過重寫onAdvance方法實現,通常可以用來控制終止狀態。重寫此方法類似於為CyclicBarrier提供一個barrierAction,但比它更靈活。

    Waiting(等待機制) awaitAdvance方法需要一個表示arrival phase number的參數,並且在phaser前進到與給定phase不同的phase時返回。和CyclicBarrier不同,即使等待線程已經被中斷,awaitAdvance方法也會一直等待。中斷狀態和超時時間同樣可用,但是當任務等待中斷或超時後未改變phaser的狀態時會遭遇異常。如果有必要,在方法forceTermination之後可以執行這些異常的相關的handler進行恢復操作,Phaser也可能被ForkJoinPool中的任務使用,這樣在其他任務阻塞等待一個phase時可以保證足夠的並行度來執行任務。

  • Termination(終止機制)
    可以用isTerminated方法檢查phaser的終止狀態。在終止時,所有同步方法立刻返回一個負值。在終止時嘗試註冊也沒有效果。當調用onAdvance返回true時Termination被觸發。當deregistration操作使已註冊的parties變為0時,onAdvance的預設實現就會返回true。也可以重寫onAdvance方法來定義終止動作。forceTermination方法也可以釋放等待線程並且允許它們終止。

  • Tiering(分層結構)
    Phaser支持分層結構(樹狀構造)來減少競爭。註冊了大量parties的Phaser可能會因為同步競爭消耗很高的成本, 因此可以設置一些子Phaser來共用一個通用的parent。這樣的話即使每個操作消耗了更多的開銷,但是會提高整體吞吐量。在一個分層結構的phaser里,子節點phaser的註冊和取消註冊都通過父節點管理。子節點phaser通過構造或方法register、bulkRegister進行首次註冊時,在其父節點上註冊。子節點phaser通過調用arriveAndDeregister進行最後一次取消註冊時,也在其父節點上取消註冊。
    這也是它的主要亮點之一,這一點很像ConcurrentHashMap(對HashTable)和LongAdder(對AtomicLong),通過分散熱點來降低資源競爭,提升併發效率。

  • Monitoring(狀態監控)
    由於同步方法可能只被已註冊的parties調用,所以phaser的當前狀態也可能被任何調用者監控。在任何時候,可以通過getRegisteredParties獲取parties數,其中getArrivedParties方法返回已經到達當前phase的parties數。當剩餘的parties(通過方法getUnarrivedParties獲取)到達時,phase進入下一代。這些方法返回的值可能只表示短暫的狀態,所以一般來說在同步結構里並沒有啥卵用。

CountDownLatch和CyclicBarrier都非常簡單,從Phaser提供的api數量就可以看出為什麼說它更加靈活,show me the code,接下來我們通過幾個例子感受一下。

Phaser例子

例子1:子線程會等全部子線程達到後才開始執行,實現類似CyclicBarrier的效果。

	@Test
	public void test1() throws InterruptedException {
		List<Runnable> list = Lists.newArrayList();
		for (int i = 0; i < 10; i++) {
			final int j = i;
			list.add(() -> System.out.println(j));
		}

		final Phaser phaser = new Phaser(); // "1" to register self
		// create and start threads
		int i = 0;
		for (final Runnable task : list) {
			i++;
			final int j = i;
			phaser.register();
			new Thread(() -> {
				try {
					Thread.sleep(j * 1000);
				} catch (InterruptedException e) {
				}
				//全部子線程到達後才開始執行
				phaser.arriveAndAwaitAdvance(); // await all creation
				task.run();
			}).start();
		}
		Thread.sleep(15000);
	}

例子2:task會迴圈做3次,通過重寫onAdvance可以控制phaser結束的條件。

    	@Test
	public void test2() throws InterruptedException {
		//重覆做3次
		int iterations = 3;
		List<Runnable> list = Lists.newArrayList();
		for (int i = 0; i < 2; i++) {
			final int j = i;
			list.add(() -> System.out.println(j));
		}

		final Phaser phaser = new Phaser() {			
			//每做一次,phase+1,該方法返回true,就會結束
			protected boolean onAdvance(int phase, int registeredParties) {
				return phase > iterations || registeredParties == 0;
			}
		};
		phaser.register();
		for (final Runnable task : list) {
			phaser.register();
			new Thread(() -> {
				do {
					task.run();
					phaser.arriveAndAwaitAdvance();
				} while (!phaser.isTerminated());
			}).start();
		}
		phaser.arriveAndDeregister(); // deregister self, don't wait  
		Thread.sleep(5000);
	}

例子3:創建多個phaser,並關聯到父phaser上,就是上面提到的分層結構。

    	@Test
	public void test3() {
		Phaser parent = new Phaser(1);
		Phaser phaser1 = new Phaser(parent);
		Phaser phaser2 = new Phaser(parent);

		for (int i = 0; i < 20; i++) {
			final int j = i;
			if (i < 10) {
				phaser1.register();
				new Thread(() -> {
					try {
						Thread.sleep(1000);
						phaser1.arriveAndAwaitAdvance(); // await all creation
						System.out.println(j);
					} catch (InterruptedException e) {
					}
				}).start();
			} else if (i < 20) {
				phaser2.register();
				new Thread(() -> {
					try {
						Thread.sleep(10000);
						phaser2.arriveAndAwaitAdvance(); // await all creation
						System.out.println(j);
					} catch (InterruptedException e) {
					}
				}).start();
			}
		}
		parent.arriveAndAwaitAdvance();
		System.out.println("done");
	}

例子4:使用Phaser改寫我們的代碼,如下:

    	//維護一個Phaser    
	public static ExecuteInstance buildWithPhaser(Executor executor) {
		ExecuteInstance ei = new ExecuteInstance();
        	ei.executor = executor;
		ei.phaser = new Phaser(1);        
		return ei;
	}

    	//提交線程池前註冊一下
    	public void executeRR(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
		phaser.register();
		executor.execute(() -> executeStatistics(task, exceptionHandler, batch));		
	}

    	//執行後deregister一下
    	private void executeStatistics(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
		ReturnResult result = ReturnResult.NONE;
		try {
        	    	//任務處理
			result = task.call();
		} catch (Exception e) {
			if (statistics) {
				counter.incrException(batch);
			}
			if (exceptionHandler != null) {
				//自定義異常處理
				try {
					exceptionHandler.accept(e);
				} catch (Exception he) {
				}
			}
		} finally {
			phaser.arriveAndDeregister(); //deregister   
			if (statistics) {
				if (ReturnResult.SUCCESS.equals(result)) {
					counter.incrSuccess(batch);
				} else if (ReturnResult.FAIL.equals(result)) {
					counter.incrFail(batch);
				} else if (ReturnResult.FILTER.equals(result)) {
					counter.incrFilter(batch);
				}
			}
		}
	}

    	//等待結果
    	public ExecuteResult awaitResult() {
		phaser.arriveAndAwaitAdvance();
		return getExecuteResult();
    	}

使用就非常簡單了

	ExecuteInstance ei = ExecuteInstance.buildWithPhaser(myExecutor); //線程池
		
    	//迴圈
     	LoopShutdown.build("myTask").loop(() -> {

        	//不斷從數據獲取數據
        	List<Task> list = getFromDb();            

		list.forEach(item -> ei.execute(() -> {
			//提交到線程池執行,並且統計
		}));        	
	});

	return ei.awaitResult();

總結

Phaser是jkd7後提供的同步工具類,它底層並沒有使用AQS同步工具。相比CountDownLatch等它提供了更豐富的功能,但也意味著它更複雜,需要更多的資源,一些簡單的場景CountDownLatch等工具類能滿足的就使用它們即可,考慮性能,還有靈活性時才考慮使用Phaser,如筆者的場景使用Phaser就更加適合。

更多分享,歡迎關註我的github:https://github.com/jmilktea/jtea


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我需要獲得新聞,然後tts,在每天上班的路上可以聽一下。具體的方案後期我也會做一次分享。先看我喜歡的萬能的老路:獲得html內容-> python的工具庫解析,獲得元素中的內容,完成。 好家伙,我知道我爬取失敗了。一堆js代碼,看得我煩。我一去看頁面發現:原來新聞的獲得是走的介面,然後js插入文檔的 ...
  • 具體報錯為: Error attempting to get column 'DISEASENAME' from result set. Cause: java.sql.SQLFeatureNotSupportedException: 這個 org.postgresql.jdbc.PgResultS ...
  • 來源:blog.csdn.net/zjhred/article/details/84976734 在文章的開頭,先說下NPE問題,NPE問題就是,我們在開發中經常碰到的NullPointerException。假設我們有兩個類,他們的UML類圖如下圖所示 ![](https://img2023.cn ...
  • HotSpot的演算法實現 HotSpot的演算法實現概要 1、枚舉根節點 由於目前的主流Java虛擬機使用的都是準確式GC(這個概念在第1章介紹Exact VM對Classic VM的改進時講過),所以當執行系統停頓下來後,並不需要一個不漏地檢查完所有執行上下文和全局的引用位置,虛擬機應當是有辦法直接 ...
  • 問題:在Windows環境下部署java的jar包,若有多個服務同時啟動,很難找到相應服務重啟。每次都重啟全部服務很麻煩。應用場景大多用於部署測試。 適用:jar部署,war部署不適用。 解決方案:找到相應jar服務關閉並重啟。 註意: 1、正確設置埠,jar服務運行的埠; 2、正確設置jar文 ...
  • 數據類型是編程語言中的一個重要概念,它定義了數據的類型和提供了特定的操作和方法。在 python 中,數據類型的作用是將不同類型的數據進行分類和定義,例如數字、字元串、列表、元組、集合、字典等。這些數據類型不僅定義了數據的類型,還為數據提供了一些特定的操作和方法,例如字元串支持連接和分割,列表支持排... ...
  • ## 依賴導入 ```xml org.hibernate.orm hibernate-core 6.2.7.Final com.mysql mysql-connector-j 8.0.33 ``` ## 配置文件 ```xml com.mysql.cj.jdbc.Driver jdbc:mysql: ...
  • 在實際開發過程中,我們可能會遇到併發寫文件的場景,如果處理不當很可能出現文件內容亂序問題。下麵我們通過一個示常式序描述這一過程並給出解決該問題的方法。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...