併發編程簡介 什麼是併發編程 串列 : 串列化,按照步驟進行,一步一步來,不能越級(比如洗茶具,打水,燒水,等水開,沖茶) 並行 : 多個任務一起執行(打水,燒水的時候洗茶具,等水開,沖茶) 並行的好處是可以縮短整個流程的時間 併發編程的目的 : 更加充分的利用資源 加快程式的響應速度(耗時任務,w ...
併發編程簡介
什麼是併發編程
串列 : 串列化,按照步驟進行,一步一步來,不能越級(比如洗茶具,打水,燒水,等水開,沖茶)
並行 : 多個任務一起執行(打水,燒水的時候洗茶具,等水開,沖茶)
並行的好處是可以縮短整個流程的時間
併發編程的目的 :
- 更加充分的利用資源
- 加快程式的響應速度(耗時任務,web伺服器)
- 簡化非同步事件的處理
什麼時候適合使用併發編程 :
- 任務會阻塞線程,導致之後的代碼不能執行
- 任務執行時間過長,可以劃分為分工明確的子任務,比如: 分段下載
- 任務間斷性執行,比如: 日誌列印
- 任務本身需要協作執行,比如: 生產者和消費者
併發編程之頻繁的上下文切換
上下文切換是指: cpu為線程分配時間片,時間片非常短(毫秒),cpu不停的切換線程執行,在切換前會保存上一個任務的狀態,以便下次切換回這個任務時,可以再載入這個任務的狀態,讓我們感覺是多個程式同時運行.
上下文的頻繁切換,會帶來一定的性能開銷.如何減少上下文切換的開銷呢?
- 無鎖(free-lock)併發編程 : 多線程競爭鎖,會引起上下文切換,所以可以使用一些辦法來避免使用鎖,如將數據的ID按照Hash演算法取模分段,不同的線程處理不同段的數據
- CAS演算法 : Java中的Atomic包使用CAS演算法來更新數據,而不需要加鎖.
- 使用最少的線程 : 避免創建不需要的線程,比如任務很少,但是創建了很多線程處理,這樣會造成大量線程處於等待狀態.
- 協程 : 在單線程里實現多任務的調度,併在單線程里維持多個任務間的切換. --GO
併發編程之死鎖
死鎖也就是多個線程互相占有對方想要的資源,造成了等待的問題,寫個程式顯示死鎖的情況:
public class DeadLockDemo {
private static final Object HAIR_A = new Object();
private static final Object HAIR_B = new Object();
public static void main(String[] args) {
new Thread(()->{
synchronized (HAIR_A){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (HAIR_B){
System.out.println("HAIR_A在前");
}
}
}).start();
new Thread(()->{
synchronized (HAIR_B){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (HAIR_A){
System.out.println("HAIR_B在前");
}
}
}).start();
}
}
上面的代碼就會出現死鎖,造成了程式永遠不會結束,可以打開命令行,輸入jconsole,打開工具,可以檢測死鎖的情況.
針對於上面的這種情況,如何預防死鎖呢? 就是不要嵌套死鎖,這麼使用很容易造成死鎖.
併發編程之線程安全
線程不完全也就是多個線程共同運行,執行某個操作,但操作的結果與預期的結果不相同,代碼演示:
/**
* @ClassName UnSafeThread
* @Author wz157
* @Date 2018/12/15 14:13
* @Description 線程不安全操作實例代碼
*/
public class UnSafeThread {
private static int count = 0;
/**
* 參數10表示有10個線程
* */
private static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void create(){
count ++;
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
for (int i1 = 0; i1 < 100; i1++) {
create();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
}).start();
}
while(true){
if(countDownLatch.getCount() == 0){
System.out.println(count);
break;
}
}
}
}
這個時候可以嘗試將睡眠10ms去掉,這樣的話,實際結果與預期結果就會相同.
產生線程不安全問題的原因 : num++ 不是原子性操作,被拆分成好幾個步驟,在多線程併發執行的情況下,因為cpu調度,多線程快速切換,有可能兩個同一時刻都讀取了同一個num值,之後對它進行+1操作,導致線程安全問題.
併發編程之資源限制
硬體資源
伺服器: 1m
本機:2m
帶寬的上傳/下載速度、硬碟讀寫速度和CPU的處理速度。
軟體資源
資料庫連接 500個連接 1000個線程查詢 並不會因此而加快
socket
線程的基礎
進程與線程的區別
進程 : 是系統分配和管理資源的基本單位
線程 : 進程的一個執行單元,是進行內調度的實體,是CPU調度和分派的基本單位,是比進程更小的獨立運行的基本單元. 線程也被稱為輕量級線程,線程是程式執行的最小基本單元.
一個程式最少一個進程,一個進程至少一個線程.
進程有自己的獨立地址空間,每啟動一個進程,系統就會為它分配地址空間,建立數據表來維護代碼段、堆棧段和數據段,這種操作非常昂貴。
而線程是共用進程中的數據的,使用相同的地址空間,因此CPU切換一個線程的花費遠比進程要小很多,同時創建一個線程的開銷也比進程要小很多。
線程之間的通信更方便,同一進程下的線程共用全局變數、靜態變數等數據,而進程之間的通信需要以通信的方式進行。
如何處理好同步與互斥是編寫多線程程式的難點。
多進程程式更健壯,進程有獨立的地址空間,一個進程崩潰後,在保護模式下不會對其它進程產生影響,
而線程只是一個進程中的不同執行路徑。線程有自己的堆棧和局部變數,但線程之間沒有單獨的地址空間,所以可能一個線程出現問題,進而導致整個程式出現問題
額外補充一下程式的概念: 程式就是靜態的代碼,也就是源碼.
線程的狀態及其轉換
在jdk的Thread類中,有一個枚舉類State,裡面定義了線程的6種狀態:
初始(NEW) : 新創建一個線程對象,但還沒調用start()方法
運行(RUNNABLE) : 調用start方法,處於可運行狀態,等待操作系統的調度或者其他資源.
阻塞(BLOCKED) : 線程阻塞於synchronized鎖,等待獲取synchronized鎖的狀態.
等待(WAITING) : Object.wait(),join(),LockSupport.park(),進入該狀態的線程需要等待其他線程做出一些特定的動作(通知或中斷).也就是喚醒等
超時等待(TIME_WAITING) : Object.wait(long)、Thread.join()、LockSupport.parkNanos()、LockSupport.parkUntil,該狀態不同於WAITING,它可以在指定的時間內自行返回.
終止(TERMINATED) : 表明該線程已經執行完畢
線程的狀態切換 :
創建線程的方式
方式一 : 繼承Thread類
public class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.setName("線程1");
myThread.start();
}
}
方式二 : 實現Runnable介面
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setName("MyRunnable");
thread.start();
}
}
一般會選擇第二種,因為java只允許單繼承,但會允許多實現,所以一般採用第二種,增加程式的健壯性,代碼可以共用,代碼跟數據相互獨立.
調用start方法和調用run方法的區別 : start()方式是啟動一個線程,而調用run方法相當於是普通方法的調用,不會重新啟動線程,相當於從頭到尾都是一個線程.
方式三 : 實現Callable介面
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("call() : " + Thread.currentThread().getName());
return "MyCallable";
}
public static void main(String[] args) {
// 創建對象
Callable<String> myCallable = new MyCallable();
// 創建FutureTask,並傳遞Callable介面的實現類對象
FutureTask<String> futureTask = new FutureTask<>(myCallable);
// 創建線程池對象
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 直接執行線程
executorService.submit(myCallable);
// 執行線程,通關過FutureTask來執行線程
executorService.execute(futureTask);
try {
System.out.println("futureTask.get() : " + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
方式四 : 線程池的方式創建
public class ThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
方式五 : 匿名內部類的方式創建
public class MyInnerClass {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
thread.setName("innerClass");
thread.start();
}
}
方式六 : lambda的方式創建
public class MyLambda {
public static void main(String[] args) {
new Thread(()->{
System.out.println(Thread.currentThread().getName());
}).start();
}
}
線程的掛起和恢復
什麼是線程的掛起? 線程的掛起操作實質上就是使線程進入“非可執行”狀態下,在這個狀態下CPU不會分給線程時間片,進入這個狀態可以用來暫停一個線程的運行。線上程掛起後,可以通過重新喚醒線程來使之恢復運行
為什麼掛起線程? cpu分配的時間片非常短、同時也非常珍貴。避免資源的浪費。
如何掛起線程?
被廢棄的方法
thread.suspend() 和 thread.resume()
可以使用的方法
wait() 暫停執行、放棄已經獲得的鎖、進入等待狀態
notify() 隨機喚醒一個在等待鎖的線程
notifyAll() 喚醒所有在等待鎖的線程,自行搶占cpu資源
什麼時候適合使用掛起線程? 等待某些未就緒的資源,讓線程進入掛起狀態.當資源就緒,調用notify方法讓線程進行運行狀態.
線程的中斷操作
stop()方法可以用來中斷線程,但是不要使用,已經廢棄,這個方法一旦使用,線程立刻停止,可能會引發相應的線程安全問題.
interrupt()方法可以用來中斷線程,看代碼:
public class InterruptDemo implements Runnable {
@Override
public void run() {
// 判斷狀態,是否打上標記,如果為true,就會退出
while (! Thread.currentThread().isInterrupted()){
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new InterruptDemo());
thread.start();
Thread.sleep(2000L);
// interrupt方法只是打上標記
thread.interrupt();
}
}
自行定義標誌,用來進行判斷,模仿interrupt()方法
public class InterruptDemo2 implements Runnable {
private static volatile boolean FLAG = true;
@Override
public void run() {
// 判斷狀態,是否打上標記,如果為true,就會退出
while (FLAG){
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new InterruptDemo2());
thread.start();
Thread.sleep(2000L);
FLAG = false;
}
}
線程的優先順序
線程的優先順序告訴程式該線程的重要程度有多大。如果有大量線程都被堵塞,都在等候運行,程式會儘可能地先運行優先順序的那個線程。但是,這並不表示優先順序較低的線程不會運行。若線程的優先順序較低,只不過表示它被准許運行的機會小一些而已。
線程的優先順序,最大優先順序是10,最小優先順序是1,預設的優先順序是5,優先順序大於10或小於0,會拋出異常.
代碼案例:
public class PriorityDemo {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName());
}
});
thread.setPriority(Thread.MAX_PRIORITY);
thread.setName("Thread-01 : " + thread.getPriority());
thread.start();
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName());
}
});
// 設置優先順序
thread2.setPriority(Thread.MIN_PRIORITY);
thread2.setName("Thread-02: " + thread2.getPriority());
thread2.start();
}
}
守護線程
線程的分類: 用戶線程和守護線程
守護線程: 任何一個守護線程都是整個程式中所有用戶線程的守護者,只要有活著的用戶線程,守護線程就活著。當JVM實例中最後一個非守護線程結束時,守護線程也隨JVM一起退出
守護線程的用處 : jvm垃圾清理線程
建議: 儘量少使用守護線程,因為其不可控. 不要在守護線程里去進行讀寫操作,執行計算邏輯.
代碼編寫:
public class DaemonThreadDemo implements Runnable {
@Override
public void run() {
while (true){
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
// 用戶線程
Thread thread = new Thread(new DaemonThreadDemo());
// 在start之前調用,否則無效,並會拋出異常
thread.setDaemon(true);
thread.start();
// 當主線程退出,守護線程也結束
Thread.sleep(2000);
}
}
線程的安全性
什麼是線程安全性
當多個線程訪問某個類,不管運行時環境採用何種調度方式或者這些線程如何交替運行,並且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麼就稱這個類是線程安全的. -----併發編程實戰一書
什麼是線程不安全? 多線程併發訪問時,得不到正確的結果.
原子性操作
什麼是原子性操作? 一個操作或多個操作,要麼全部執行並且執行過程不會被任何因素打斷,要麼都不執行. 這的原子性與mysql事務中的原子性是一樣的道理.
如何將非原子性操作變成原子性?
使用volatile關鍵字修飾變數,但是volatile僅僅保證可見性,並不保證原子性.
synchronized關鍵字,是的操作具有原子性.也就是在方法上加上synchronized關鍵字
深入理解synchronized
內置鎖 : 每個java對象都可以用做一個實現同步的鎖,這些鎖稱為內置鎖.線程進入同步代碼塊或方法的時候會自動獲取該鎖,在退出同步代碼塊或方法時會釋放該鎖. 獲得內置鎖的唯一途徑就是進入這個鎖保護的同步代碼塊或方法.
互斥鎖 : 內置鎖就是互斥鎖,這意味著最多只有一個線程能夠獲得該鎖,當線程A嘗試去獲取線程B持有的內置鎖時,線程A必須等待或者阻塞,直到線程B釋放這個鎖. 如果線程B不釋放這個鎖,那麼線程A將永遠等待下去.
synchronized的使用場景:
修飾普通的方法 : 鎖住對象實例(普通方法屬於對象)
修飾靜態方法 : 鎖住整個類(靜態方法屬於類),儘量不要使用
修飾代碼塊 : 鎖住傳入的對象(synchronized(lock))
synchronized代碼塊是由一塊monitorenter/monitorexit指令實現的,Monitor對象是同步的基本實現單元. jdk6之前,synchronized是使用操作系統的互斥鎖實現的,需要進行用戶態到內核態的奇幻,這個是無差別的重量級操作.在現在使用的jdk中,對synchronized進行了新的實現,也就是三種不同的鎖: 偏斜鎖、輕量級鎖、重量級鎖. 並且當JVM檢測到不同的競爭狀況時,會有鎖的升級和降級,自動切換到適合的鎖實現. 沒有競爭出現,預設會使用偏斜鎖,當有另外的線程試圖鎖定某個已經被偏斜過的對象,JVM會撤消偏斜鎖,切換成輕量級鎖,輕量級鎖依賴CAS操作來試圖獲取鎖,重試成功,使用普通的輕量級鎖;否則,進一步升級為重量級鎖.
volatile關鍵字
volatile關鍵字只能修飾變數,保證該對象的可見性. 禁止指令重排序(記憶體模型).
A、B兩個線程同時讀取volatile關鍵字修飾的對象,A讀取之後,修改了變數的值,修改後的值,對B線程來說是可見的.
使用場景 :
作為線程開關
單例、修飾對象實例,禁止指令重排序
作為線程開關:
public class VolatileDemo implements Runnable {
private static volatile boolean flag = true;
@Override
public void run() {
while(true){
if(flag){
System.out.println(Thread.currentThread().getName());
} else{
System.out.println("flag : " + flag);
break;
}
}
}
public static void main(String[] args) {
new Thread(new VolatileDemo()).start();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
VolatileDemo.flag = false;
}
}
單例和線程安全
餓漢式----本身是線程安全的, 在類載入的時候,就已經進行實例化,無論這個類會不會被使用. 如果該類比較占記憶體,之後有沒有用到,就白白浪費資源.
public class HungerSingleton {
private static HungerSingleton ourInstance = new HungerSingleton();
public static HungerSingleton getInstance() {
return ourInstance;
}
/**
* 構造器必須私有化
*/
private HungerSingleton() {
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
HungerSingleton hungerSingleton = HungerSingleton.getInstance();
System.out.println(hungerSingleton);
}).start();
}
}
}
懶漢式 ---- 在需要的時候進行實例化
public class LazySingleton {
/**
* 加上volatile關鍵字,防止指令重排序
*/
private static volatile LazySingleton lazySingleton;
private LazySingleton(){}
public static LazySingleton getInstance(){
// 如果不存在,直接new; 如果存在,直接返回
if(null == lazySingleton){
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(LazySingleton.class){
if(null == lazySingleton) {
lazySingleton = new LazySingleton();
}
}
}
return lazySingleton;
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(LazySingleton.getInstance());
}).start();
}
}
}
如何避免線程安全性問題
線程安全性問題成因 :
多線程環境
多個線程操作同一共用資源
對該共用資源進行了非原子性操作
如何避免(打破成因中三點任意一點)
將多線程環境改為單線程(必要的代碼訪問使用加鎖訪問)
將共用資源變為不共用(ThreadLocal、不共用、操作無狀態化、不可變)
將非原子性操作改成原子性操作(加鎖、使用JDK自帶的原子性操作的類、JUC提供的相應的併發工具類)
鎖
鎖的分類
自旋鎖 : 線程狀態及上下文切換消耗資源,當訪問共用資源的時間短,頻繁上下文切換不值得. jvm實現,使線程在沒獲得鎖的時候,不被掛起,轉而執行空迴圈,執行幾次空迴圈後,如果還沒獲得鎖,則被掛起.
阻塞鎖 : 阻塞鎖改變了線程的運行狀態,讓線程進入阻塞狀態進行等待,當獲得相應的信號(喚醒或時間)時,才可以進入線程的準備就緒狀態,轉為就緒狀態的所有線程,通過競爭,進入運行狀態.
重入鎖 : 支持線程再次進入的鎖.比如說ReentrantLock.
讀寫鎖 : 兩把鎖,讀鎖和寫鎖,寫寫互斥、讀寫互斥、讀讀共用
互斥鎖 : 任一時刻,只有一個線程訪問該對象
悲觀鎖 : 總是假設最壞的情況,每次拿數據的時候都會認為別人會修改,所以每次拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞知道它拿到鎖.
樂觀鎖 : 每次那數據的時候都認為別人不會修改,所以不會上鎖,但在更新的時候會判斷在此期間別人有沒有去更新這個數據. 一幫有兩種實現方式: 版本號控制和CAS.
公平鎖 : 加鎖前先查看是否有排隊等待的線程,有的話優先處理排在前面的線程,也就是排隊等待,先到先得.
非公平鎖 : 線程加鎖時直接嘗試獲取鎖,獲取不到的話就自動到隊尾等待.
獨占鎖 : 獨占鎖模式下,每次只能有一個線程能持有鎖.
共用鎖 : 允許多個線程同時獲得鎖,併發訪問共用資源
偏斜鎖 : 偏斜鎖使用了一種等到競爭出現才釋放鎖的機制,所以當其他線程嘗試競爭偏斜鎖時,持有偏向鎖的線程才會釋放鎖. 簡單來說偏斜其實不算一種鎖,一種假的鎖.
輕量級鎖 : 當出現競爭條件是,jvm會撤銷偏斜鎖,嘗試加輕量級鎖,這個叫鎖的升級. 使用自旋鎖實現.
重量級鎖 : 嘗試加輕量級鎖失敗,會轉為重量級鎖,也是鎖的升級. 依靠操作系統來實現.
Lock介面
Lock的使用(使用ReentrantLock來實現) :
public class UnSafeThread {
private static int count = 0;
/**
* 參數10表示有10個線程
* */
private static CountDownLatch countDownLatch = new CountDownLatch(10);
private static Lock lock = new ReentrantLock();
public static void create(){
lock.lock();
count ++;
lock.unlock();
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
for (int i1 = 0; i1 < 100; i1++) {
create();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
}).start();
}
while(true){
if(countDownLatch.getCount() == 0){
System.out.println(count);
break;
}
}
}
}
Lock與synchronized的區別(ReentrantLock與synchronized的區別) :
Lock需要手動控制,也就是需要手動調用lock方法加鎖,調用unlock方法釋放鎖.Lock是一種樂觀鎖,使用CAS機制
synchronized在1.5之前依靠操作系統的互斥來實現,之後提供了三種實現: 偏斜鎖、輕量級鎖、重量級鎖,不需要手動調用,只需要將同步代碼代碼synchronized代碼塊中就好
實現了lock介面的鎖 :
內部的方法 :
實現自己的鎖
實現自己的鎖需要實現Lock介面,模仿ReentrantLock,重寫lock方法和unlock方法:
public class MyLock implements Lock {
/**
* 定義一個標識,用來判斷當前鎖是否已被持有
*/
private boolean isHoldLock = false;
/**
* 重入的線程
*/
private Thread holdLockThread = null;
/**
* 重入次數
*/
private int reentryCount = 0;
/**
* 同一時刻,能且僅能有一個線程獲取到鎖,其他線程只能等待該線程釋放鎖之後才能獲取到鎖
*/
@Override
public synchronized void lock() {
if(isHoldLock && Thread.currentThread() != holdLockThread){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
holdLockThread = Thread.currentThread();
isHoldLock = true;
reentryCount ++ ;
}
/**
*
*/
@Override
public synchronized void unlock() {
// 判斷當前線程是否是持有鎖的線程,是, 重入次數減去1,不是就不進行操作
if(Thread.currentThread() == holdLockThread){
reentryCount --;
// 重入次數為0的時候才允許釋放鎖
if(reentryCount == 0) {
notify();
isHoldLock = false;
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
使用可重入對象和可重入次數才實現可重入,如果不實現可重入,可能造成單條線程處於waiting狀態.
測試的代碼,表名自己實現的鎖可重入:
public class ReentryDemo {
public Lock lock = new MyLock();
public void methodA(){
lock.lock();
System.out.println("進入方法A");
methodB();
lock.unlock();
}
public void methodB(){
lock.lock();
System.out.println("進入方法B");
// methodA();
lock.unlock();
}
public static void main(String[] args) {
ReentryDemo reentryDemo = new ReentryDemo();
reentryDemo.methodA();
}
}
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer----為實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量、事件等等)提供一個框架. 此類的設計目標是成為依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。子類必須定義更改此狀態的受保護方法,並定義哪種狀態對於此對象意味著被獲取或被釋放。 假定這些條件之後,此類中的其他方法就可以實現所有排隊和阻塞機制。子類可以維護其他狀態欄位,但只是為了獲得同步而只追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。應該將子類定義為非公共內部幫助器類,可用它們來實現其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現任何同步介面。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當的時候可以通過具體的鎖和相關同步器來調用它們,以實現其公共方法。
此類支持預設的獨占模式和共用模式之一,或者二者都支持。處於獨占模式下時,其他線程試圖獲取該鎖將無法取得成功。在共用模式下,多個線程獲取某個鎖可能(但不是一定)會獲得成功。此類並不“瞭解”這些不同,除了機械地意識到當在共用模式下成功獲取某一鎖時,下一個等待線程(如果存在)也必須確定自己是否可以成功獲取該鎖。處於不同模式下的等待線程可以共用相同的FIFO隊列。通常,實現子類只支持其中一種模式,但兩種模式都可以在(例如)ReadWriteLock中發揮作用。只支持獨占模式或者只支持共用模式的子類不必定義支持未使用模式的方法。
此類通過支持獨占模式的子類定義了一個嵌套的AbstractQueuedSynchronizer.ConditionObject 類,可以將這個類用作Condition實現。isHeldExclusively()方法將報告同步對於當前線程是否是獨占的;使用當前getState() 值調用release(int)方法則可以完全釋放此對象;如果給定保存的狀態值,那麼acquire(int) 方法可以將此對象最終恢復為它以前獲取的狀態。沒有別的 AbstractQueuedSynchronizer方法創建這樣的條件,因此,如果無法滿足此約束,則不要使用它。AbstractQueuedSynchronizer.ConditionObject的行為當然取決於其同步器實現的語義。
此類為內部隊列提供了檢查、檢測和監視方法,還為condition對象提供了類似方法。可以根據需要使用用於其同步機制的 AbstractQueuedSynchronizer將這些方法導出到類中。
此類的序列化只存儲維護狀態的基礎原子整數,因此已序列化的對象擁有空的線程隊列。需要可序列化的典型子類將定義一個 readObject方法,該方法在反序列化時將此對象恢復到某個已知初始狀態。
主要方法:
tryAcquire(int)
tryRelease(int)
tryAcquireShared(int)
tryReleaseShared(int)
isHeldExclusively()
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if ((arg))
unblock the first queued thread;
ReentrantLock
閱讀源碼的方法:
一段簡單的代碼
看構造
看類之間的關係,形成關係圖
看使用到的方法,並逐步理解,邊看代碼邊看註釋
debug
對於ReentrantLock的源碼分析以後研究,還有StampedLock
線程間的通信
wait notify notifyAll
何時使用: 在多線程環境下,有時候一個線程的執行,依賴於另外一個線程的某種狀態的改變,這個時候,我們就可以使用wait與notify或者notifyAll. 空迴圈
public class Demo {
private static boolean flag = false;
public static void main(String[] args) {
// 線程空迴圈,會造成資源的浪費
new Thread(()->{
while(!flag){
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("flag is false");
}
System.out.println("flag is true");
}).start();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
flag = true;
}).start();
}
}
wait和sleep的區別 : wait會釋放持有的鎖,而sleep不會,sleep是讓線程在指定的時間內,不去搶占cpu的資源.
註意點 : wait和notify必須放在同步代碼塊中,且必須擁有當前對象鎖.也就是不能利用A對象的鎖去調用B對象的wait, 那個對象的wait就得調用哪個對象的notify. 簡單理解wait和notify是依賴與對象存在的.
public class Demo1 {
private static boolean flag = false;
public static void main(String[] args) {
Object obj = new Object();
// 線程空迴圈,會造成資源的浪費
new Thread(()->{
synchronized (obj){
while(!flag){
System.out.println("flag is false");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("flag is true");
}).start();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
synchronized (obj){
flag = true;
obj.notify();
}
}).start();
}
}
notify與notifyAll的區別 : notify會隨機喚醒一個等待的線程. notifyAll喚醒所有等待的線程.
等待通知之生產消費
生產者消費者模型一般包括三個角色 : 生產者 消費者 中間商
代碼進行展示:
中間商的代碼:
public class Medium {
/**
* 庫存
*/
private int num = 0;
/**
* 庫存最大容量
*/
private static final int TOTAL = 20;
/**
* 生產
*/
public synchronized void put(){
// 判斷當前庫存是否已達最大容量
if(num < TOTAL){
// 如果不是,生產完成之後通知消費者進行消費
System.out.println("生產者進行生產 : " + (++ num));
// 喚醒所有線程,讓消費者進行消費
notifyAll();
} else {
// 如果是,通知生產者暫停生產,進入等待,等待消費者進行消費
try {
System.out.println("庫存已滿,新增操作等待!!!" + num);
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 消費
*/
public synchronized void take(){
// 判斷當前庫存是否大於0
if(num > 0){
// 是,進行消費,通知生產者進行生產
System.out.println("消費者進行消費,消費過後的剩餘產品數 : " + (--num));
// 喚醒所有線程
notifyAll();
} else{
// 不是,消費者等待,通知生產者進行生產
System.out.println("商品數量為0,等待生產者進行生產 : " + num);
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生產者代碼:
public class Producer implements Runnable{
private Medium medium;
public Producer(Medium medium) {
this.medium = medium;
}
@Override
public void run() {
while(true){
medium.put();
}
}
}
消費者代碼:
public class Consumer implements Runnable{
private Medium medium;
public Consumer(Medium medium) {
this.medium = medium;
}
@Override
public void run() {
while(true) {
medium.take();
}
}
}
測試類代碼:
public class Main {
public static void main(String[] args) {
Medium medium = new Medium();
for (int i = 0; i < 10; i++) {
new Thread(new Producer(medium)).start();
}
for (int i = 0; i < 10; i++) {
new Thread(new Consumer(medium)).start();
}
}
}
使用管道流進行通信
以記憶體為媒介,用於線程之間的數據傳輸. 主要有面向位元組(PipedOutputStream,PipedInputStream)和麵向字元(PipedReader,PipedWriter)兩種.
管道的讀操作:
public class Reader implements Runnable{
/**
* 讀位元組管道流
*/
private PipedInputStream pipedInputStream;
public Reader(PipedInputStream pipedInputStream) {
this.pipedInputStream = pipedInputStream;
}
@Override
public void run() {
if(pipedInputStream != null){
String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
System.out.println(Thread.currentThread().getName() + ":" + collect);
}
try {
pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 管道流通信的測試
public class Main {
public static void main(String[] args) throws IOException {
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream();
try {
// 管道連接
pipedOutputStream.connect(pipedInputStream);
} catch (IOException e) {
e.printStackTrace();
}
// 創建線程
new Thread(new Reader(pipedInputStream)).start();
BufferedReader bufferedReader = null;
try {
// 字元輸入流
bufferedReader = new BufferedReader(new InputStreamReader(System.in));
// 管道流進行寫入
pipedOutputStream.write(bufferedReader.readLine().getBytes());
} finally {
pipedOutputStream.close();
if(bufferedReader != null){
bufferedReader.close();
}
}
}
}
Thread.join通信
使用場景 : 線程A執行到一半,需要一個數據,這個數據需要線程B去執行修改,只有B修改完成之後,A才能繼續操作.
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "開始運行");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "運行結束");
}, "線程1");
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "開始運行");
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "運行結束");
},"線程2").start();
}
}
上面的執行結果是 線程2開始 線程1開始 線程1結束 線程2結束, 如果不使用thread.join的結果呢? 線程2開始 線程2結束 線程1開始 線程1結束,所以thread.join的目的就是將當前的時間片分給thread,等thread執行完繼續前面的操作
ThreadLocal的使用
ThreadLock : 線程變數,是一個以ThreadLocal對象為鍵、任意對象為值的存儲結構. 為每個線程單獨存放一份變數副本,也就是說一個線程可以根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值. 只要線程處於活動狀態並且ThreadLocal實例可訪問,那麼每個線程都擁有對其本地線程副本的隱式引用變數,一個線程消失後,它的所有副本線程局部實例受垃圾回收(除非存在其他對這些副本的引用)
一般用的比較多的是:
ThreadLocal.get : 獲得ThreadLocal中當前線程共用變數的值
ThreadLocal.set : 設置ThreadLocal中當前線程共用變數的值
ThreadLocal.remove : 移除ThreadLocal中當前線程共用變數的值
ThreadLocal.initialValue : ThreadLocal沒有被當前線程賦值時或當前線程剛調用remove方法後調用get方法,返回此方法值.
代碼展示:
public class ThreadLocalDemo {
/**
* 為num賦值為0,ThreadLocal為每個對象存放一個單獨的副本
*/
ThreadLocal<Integer> num = ThreadLocal.withInitial(()->0);
/**
* 對num值進行自增
*/
public void inCreate(){
Integer myNum = num.get();
myNum ++;
System.out.println(Thread.currentThread().getName() + ":" + myNum);
num.set(myNum);
}
public static void main(String[] args) {
ThreadLocalDemo localDemo = new ThreadLocalDemo();
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(()->{
while(true) {
localDemo.inCreate();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
ThreadLocal是為每個對象存放一個單獨的副本,對象與對象間互不影響
Condition的使用
Condition : 可以在一個鎖裡面,存在多種等待條件
主要的方法 :
await
signal
signalAll
參考前面的生產者和消費者,修改中間的代碼為下:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClassName Medium
* @Author wz157
* @Date 2018/12/17 16:37
* @Description 中間商
*/
public class Medium {
/**
* 庫存
*/
private int num = 0;
/**
* 庫存最大容量
*/
private static final int TOTAL = 20;
/**
* 創建鎖,ReentrantLock
*/
private Lock lock = new ReentrantLock();
/**
* 聲明變數,代表消費者的鎖對象
*/
private Condition consumerCondition = lock.newCondition();
/**
* 聲明變數,代表生產者的鎖對象
*/
private Condition producerCondition = lock.newCondition();
/**
* 生產
*/
public void put(){
lock.lock();
try {
// 判斷當前庫存是否已達最大容量
if(num < TOTAL){
// 如果不是,生產完成之後通知消費者進行消費
System.out.println("生產者進行生產 : " + (++ num));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 喚醒消費者,讓消費者進行消費
consumerCondition.signalAll();
} else {
// 如果是,通知生產者暫停生產,進入等待,等待消費者進行消費
try {
System.out.println("庫存已滿,新增操作等待!!!" + num);
producerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
/**
* 消費
*/
public void take(){
lock.lock();
try{
// 判斷當前庫存是否大於0
if(num > 0){
// 是,進行消費,通知生產者進行生產
System.out.println("消費者進行消費,消費過後的剩餘產品數 : " + (--num));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 喚醒所生產者
producerCondition.signalAll();
} else{
// 不是,消費者等待,通知生產者進行生產
System.out.println("商品數量為0,等待生產者進行生產 : " + num);
try {
consumerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}