Java併發編程實踐讀書筆記(2)多線程基礎組件

来源:https://www.cnblogs.com/huqiaoblog/archive/2017/12/26/8117363.html
-Advertisement-
Play Games

同步容器 同步容器是指那些對所有的操作都進行加鎖(synchronize)的容器。比如Vector、HashTable和Collections.synchronizedXXX返回系列對象: 可以看到,它的絕大部分方法都被加了同步(帶個小時鐘圖標)。 雖然Vector這麼勞神費力地搞了這麼多同步方法, ...


同步容器

同步容器是指那些對所有的操作都進行加鎖(synchronize)的容器。比如Vector、HashTable和Collections.synchronizedXXX返回系列對象:

可以看到,它的絕大部分方法都被加了同步(帶個小時鐘圖標)。

雖然Vector這麼勞神費力地搞了這麼多同步方法,但在最終使用的時候它並不一定真的“安全”。

同步容器的複合操作不安全

雖然Vector的方法增加了同步,但是像下麵這種“先檢查再操作”複合操作其實是不安全的:

    //兩個同步的原子操作合在一起就不再具有原子性了
    public void getLast(Vector vector) {
        int size = vector.size();
        vector.get(size);
    }

啊?不是說Vector和HashTable是線程安全的嗎?所以,以後再聽說某個類是線程安全的,不能就覺得萬事大吉了,應該留個心想想其安全的真正含義。

Vector和HashTable這些類的線程安全指的是它所提供的單個方法具有原子性,一個線程訪問的時候其他線程不能訪問。在進行覆核操作時還需要咱們自己去保證線程安全:

public void getLast(Vector vector) {
        //客戶端顯式鎖保證符合操作的同步
        synchronized (vector) {
            int size = vector.size();
            vector.get(size);
        }
}

這種不安全的問題在遍歷集合的時候仍然存在。Vector能做的就是在出現多線程訪問導致集合內容衝突時給一個異常提醒:

final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }

一定要明白,ConcurrentModificationException異常的真正目的其實是在提醒咱的系統中存在多線程安全問題,需要我們去解決。不解決程式也能跑,但是指不定那天就見鬼了,這要靠運氣。

書中還指出,像Vector這樣把鎖隱藏在代碼端的設計,會導致客戶端經常忘記去同步。即“狀態與保護它的代碼越遠,程式員越容易忘記在訪問狀態時使用正確的同步”。這裡的狀態就是指的容器的數據元素。

即使同步容器在單方法的上能夠做到“安全”,但是它會使CPU的吞吐量下降、降低系統的伸縮性,因此才有了下麵的併發容器。

併發容器

針對於每一種同步容器,都設計了一個對應的併發容器:

隊列(Queue)和雙向隊列(Deque)是新增的集合類型。

ConcurrentHashMap使用分段鎖運行多個線程併發訪問

為瞭解決同步訪問時的低吞吐量問題,ConcurrentHashMap使用了分段鎖的方式,允許多個線程併發訪問。

同步鎖的機制是,使用16把鎖,每一把鎖負責1/16的散列桶的同步訪問。你可能猜到了,如果存儲的數據採用了一個糟糕的散列函數,那麼ConcurrentHashMap的效果HashTable一樣了。

ConcurrentHashMap也有它的問題,既然允許多個線程同時訪問了,那麼size()和isEmpty()方法的結果就不准確了。書中說這是一種權衡,認為多線程狀態下size和isEmpty方法沒有那麼重要了。但是在使用ConcurrentHashMap是我們應該知道確實有這樣的問題。

電腦世界里經常出現這樣的權衡問題。是的,沒有免費午餐,得到一些好處的同時就需要付出另外一些代價。典型的例子就是分散式系統中的CAP原則,即一致性、可用性和分區容錯性三者不可兼得。

 CopyOnWriteArrayList應用於讀遠大於寫的場景

顧名思義,添加或修改時直接複製一個新的底層數組來存儲數據。因為要複製,所以比較適合應用於寫遠小於讀的場景。比如事件通知系統,註冊和註銷(寫)的操作就遠大於接收事件的操作。

 阻塞隊列應用於生產者-消費者模式

隊列嘛就是一個存儲單元,數據可以按序存入然後按序取出。關鍵在於阻塞。在生產者-消費者模式中。生成者可以往隊列里存數據,消費者負責從隊列里獲取數據。阻塞的含義是當隊列里沒有數據時,消費者在take數據時會被阻塞,直到有生產者往隊列里put了一個數據。相反,如果隊列里的數據已經滿了,那麼生產者也只能等到消費者take走了一個數據之後才能put數據。

阻塞隊列的兩個好處:

1,使生產者和消費者解耦,他們之間不需要額外的直接對話通信;

2,阻塞隊列可以協調生產者和消費者的速度,讓較快的一方等待較慢的一方,不至於使未處理的消息累積過大;

 

阻塞方法與中斷方法

個人總結:catch到InterruptException時要麼繼續往上拋,實在不能拋了就要標記當期線程為interrupt。

Thread.currentThread().interrupt();

切忌try-catch完了之後什麼都不做,直接給和諧了。 

 

同步工具類

CountDownLatch(計數器)-等待多個結果

 當需要等待多個條件都滿足時才執行下一步,就可以用Latch來做計數器:

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        
        final Random r = new Random();
        
        final CountDownLatch latch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(r.nextInt(5) * 1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println(Thread.currentThread().getName() + " execute complated!");
                    latch.countDown();
                }
                
            }).start();
        }
        System.out.println("Wait for sub thread execute...");
        latch.await();
        System.out.println("All Thread execute complated!");
    }
    
}

 

測試結果:

Wait for sub thread execute...
Thread-4 execute complated!
Thread-1 execute complated!
Thread-0 execute complated!
Thread-3 execute complated!
Thread-2 execute complated!
All Thread execute complated!

 

Semaphore(信號量)控制資源併發訪問數量

 

Semaphore可以實現資源訪問控制,在初始化時可以指定一個數量,這個數量表示可以同時訪問資源的線程數。

也可以理解成許可證。訪問資源前問Semaphore獲取(acquire)訪問許可,如果還有剩餘的許可就能正常獲取到,否則就會等待,知道有其他線程歸還(release)許可了。

    public static void main(String[] args) {
        Semaphore s = new Semaphore(3);
        for(int i = 0;i<10;i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        System.out.println(System.currentTimeMillis() +":"+ Thread.currentThread().getName() + " waiting for Permit...");
                        s.acquire();
                        System.out.println(System.currentTimeMillis() +":"+ Thread.currentThread().getName() + " doing his job...");
                        Thread.sleep(5000);
                        s.release();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                
            }).start();
        }
    }

 

測試結果如下,可以看到每次同時執行的線程數永遠只有3個:

1514272606904:Thread-0 waiting for Permit...
1514272606904:Thread-3 waiting for Permit...
1514272606904:Thread-0 doing his job...
1514272606904:Thread-1 waiting for Permit...
1514272606905:Thread-5 waiting for Permit...
1514272606904:Thread-3 doing his job...
1514272606905:Thread-1 doing his job...
1514272606905:Thread-4 waiting for Permit...
1514272606905:Thread-6 waiting for Permit...
1514272606905:Thread-2 waiting for Permit...
1514272606905:Thread-7 waiting for Permit...
1514272606905:Thread-8 waiting for Permit...
1514272606905:Thread-9 waiting for Permit...

1514272611905:Thread-5 doing his job...
1514272611905:Thread-6 doing his job...
1514272611905:Thread-4 doing his job...

1514272616905:Thread-7 doing his job...
1514272616905:Thread-2 doing his job...
1514272616905:Thread-8 doing his job...

1514272621906:Thread-9 doing his job...

 

當把Semaphore的許可數量設置為1時,Semaphore就變成了一個互斥鎖。

 

Barrier(柵欄)實現並行計算

柵欄有著和計數器一樣的功能,他們都可以等待一些線程執行完畢後再近些某項操作。

不同之處在於柵欄可以重置,它可以讓多個線程同時到達某個狀態或結果之後再繼續往下一個目標出發。

並行計算時,各個子線程計算的速度可能不一樣,需要等待每個線程計算完成之後再繼續執行下一步計算:

垂直線表示計算狀態,水平箭頭的長度表示計算時間的差異。

public class BarrierTest {
    
    static int hours = 0;
    static boolean stopAll = false;
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("every on stop,wait for a minute.");
                hours++;
                if(hours>8) {
                    System.out.println("times up,Go off work!");
                    stopAll = true;
                }
            }
            
        });
        Random r = new Random();
        //barrier.
        for(int i = 0;i<3;i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    while(!stopAll) {
                        System.out.println(Thread.currentThread().getName() + " is working...");
                        try {
                            Thread.sleep(r.nextInt(2) * 1000);
                            barrier.await();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                }
                
            }).start();
        }
    }

}

 

 測試結果:

... ...
every on stop,wait for a minute.
Thread-1 is working...
Thread-2 is working...
Thread-0 is working...
every on stop,wait for a minute.
Thread-1 is working...
Thread-0 is working...
Thread-2 is working...
every on stop,wait for a minute.
Thread-0 is working...
Thread-1 is working...
Thread-2 is working...
every on stop,wait for a minute.
times up,Go off work!

 

另外還有一種叫Exchanger的Barrier,它可以用來做線程間的數據交換。

 

構建高效可伸縮的結果緩存

簡單的用synchronize + HashMap實現結果緩存:

public class ComputeMemroyCache<T,V> {

    HashMap<T,V> cache = new HashMap<T,V>();
    Computable<T,V> computable;
    
    public ComputeMemroyCache(Computable<T,V> computable) {
        this.computable = computable;
    }
    
    public synchronized V compute(T t) {
        V result = cache.get(t);
        if(result == null) {
            result = computable.compute(t);
            cache.put(t, result);
        }
        return result;
    }
    
}
public interface Computable<T,V> {
    
    public V compute(T t);

}

這種緩存有時甚至比沒有緩存還要糟糕:

如果計算的對象不多,那麼系統僅僅是有個很長的熱身階段,否則的話,低命中率的緩存沒有起到實際的作用,糟糕的同步反而使程式的吞吐量急劇下降。

如果去掉同步,並且使用ConcurrentHashMap,結果會好一點兒,但是還是會出現重覆計算一個結果的情況。因為compute中有“先檢查後計算”的行為(非原子操作)。

這裡一個最嚴重的問題是,計算代碼和客戶度調用同步了,就是一定要計算到一個結果之後才往Map中緩存結果,如果計算時間過長,就會導致後面很多請求的堆積。下麵的改進中使用了FutureTask來講計算推遲到另外一個線程,從而可以立即將“正在計算”的動作存放都Map中:

public class FutureTaskComputeMemroyCache<T,V> {

    ConcurrentHashMap<T,FutureTask<V>> cache = new ConcurrentHashMap<T,FutureTask<V>>();
    Computable<T,V> computable;
    
    public FutureTaskComputeMemroyCache(Computable<T,V> computable) {
        this.computable = computable;
    }
    
    public  V compute(T t) throws InterruptedException, ExecutionException {
        FutureTask<V> result = cache.get(t);
        if(result == null) {
            result = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return computable.compute(t);
                }
            });
            cache.put(t, result);
            result.run();
        }
        return result.get();
    }
    
}

 

 

還缺一點兒,上面的代碼還是會存在重覆計算的問題。還是因為“檢查並計算”的複合操作!真是夠煩人的。這裡要記住:既然都使用了ConcurrentHashMap,那麼在存取值的時候一定要記住是否還能簡單的get,一定要考慮複合操作是否需要避免的問題。因為ConcurrentHashMap已經我們準備好瞭解決複合操作的putIfAbsent方法。使用了ConcurrentHashMap而沒使用putIfAbsent那太可惜也太浪費

public class FutureTaskComputeMemroyCache<T,V> {

    ConcurrentHashMap<T,FutureTask<V>> cache = new ConcurrentHashMap<T,FutureTask<V>>();
    Computable<T,V> computable;
    
    public FutureTaskComputeMemroyCache(Computable<T,V> computable) {
        this.computable = computable;
    }
    
    public  V compute(T t) throws InterruptedException, ExecutionException {
        FutureTask<V> result = cache.get(t);
        if(result == null) {
            result = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return computable.compute(t);
                }
            });
            FutureTask<V> existed = cache.putIfAbsent(t, result);
            if(existed==null) {//之前沒有啟動計算時這裡才需要啟動
                result.run();
            }
        }
        return result.get();
    }
    
}

 

待完善
1,如果FutureTask計算失敗,需要從緩存種移除;

2,緩存過期

這裡僅嘗試實現了緩存過期:

public class TimeoutFutureTaskComputeMemroyCache<T,V> {

    ConcurrentHashMap<T,ComputeFutureTask<V>> cache = new ConcurrentHashMap<T,ComputeFutureTask<V>>();
    Computable<T,V> computable;
    
    public TimeoutFutureTaskComputeMemroyCache(Computable<T,V> computable) {
        this.computable = computable;
    }
    
    public  V compute(T t) throws InterruptedException, ExecutionException {
        ComputeFutureTask<V> result = cache.get(t);
        if(result == null) {
            result = new ComputeFutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return computable.compute(t);
                }
            },1000 * 60);//一分鐘超時
            ComputeFutureTask<V> existed = cache.putIfAbsent(t, result);
            if(existed==null) {//之前沒有啟動計算時這裡才需要啟動
                result.run();
            }else if(existed.timeout()) {//超時重新計算
                cache.replace(t, existed, result);
                result.run();
            }
        }
        return result.get();
    }
    
    class ComputeFutureTask<X> extends FutureTask<X>{
        
        long timestamp;
        long age;

        public ComputeFutureTask(Callable<X> callable,long age) {
            super(callable);
            timestamp = System.currentTimeMillis();
            this.age =age;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }
        
        public boolean timeout() {
            return System.currentTimeMillis() - timestamp > age;
        }
        
    }
    
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • node-schedule每次都是通過新建一個scheduleJob對象來執行具體方法。 時間數值按下表表示 指定時間有兩種方式1 字元串指定 *之間一定要加空格,否則不執行 每到秒數為4的倍數時執行 在秒位*後加/4 和後面的*之間要有空格schedule.scheduleJob('*/4 * * ...
  • Infi-chu: http://www.cnblogs.com/Infi-chu/ 模塊:filecmp 安裝:Python版本大於等於2.3預設自帶 功能:實現文件、目錄、遍歷子目錄的差異 常用方法: 1.單文件對比(cmp): 2.多文件對比(cmpfiles): 3.目錄對比(dircmp) ...
  • Description 給你一棵TREE,以及這棵樹上邊的距離.問有多少對點它們兩者間的距離小於等於K Input N(n<=40000) 接下來n-1行邊描述管道,按照題目中寫的輸入 接下來是k Output 一行,有多少對點之間的距離小於等於k Sample Input 7 1 6 13 6 3 ...
  • Zookerper在Linux上的安裝 最近在項目的時候,遇到一些linux的相關安裝,雖然不難,但是步驟不少,一不小心就會出錯,這樣去找錯誤費時費力,所以一般都是需要重新再來,實在是讓人頭疼,所以這裡做個總結,為需要的朋友留下一個參考,也給自己加深一下印象。 先來說一下zookeeper的安裝 要 ...
  • Spring Data JPA是依附於Spring Boot的,學習Spring Data JPA之前得先把Spring Boot的環境搭建起來。 先附上一個Spring Data JPA的官方鏈接:https://docs.spring.io/spring-data/jpa/docs/curren ...
  • 說明 上個學期暑假心血來潮,查了比較多的資料,寫了一個星期寫的貪吃蛇代碼,bug還有,開始游戲不能按空格,你按啥鍵開始都行,就是不能按空格,參考了C語言網的貪吃蛇,所以很像,裡面的代碼有參考,但是全是重新寫的,沒有直接照抄,不然也不會有bug了對不對 代碼裡面有我寫的註釋,應該可以看懂 貪吃蛇通過鏈 ...
  • 昨天,雷老師偶有閑致,評講了n周前的C程式設計作業。其中講到了一到求水仙花數的題,給出了一種漂亮的演算法,在此記錄下來。 原題 輸出所有的水仙花數,所謂水仙花數是指一個3位數,其各位數字立方和等於該數本身。 解題 思路 初始化i=100。 ①取i的各位數,百位a,十位b,個位c。 ②判斷i==a∧3+ ...
  • ServletContext.getRealPath() 是從當前servlet 在tomcat 中的存放文件夾開始計算起的 比如,有個servlet 叫 UploadServlet,它部署在tomcat 下麵以後的絕對路徑如下:"C:\Program Files\apache-tomcat-8.0 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...