JUC學習 1.什麼是JUC java.util 工具包、包、分類 業務:普通的線程代碼 Thread Runnable 沒有返回值、效率相比入 Callable 相對較低! 2.線程和進程 線程、進程,如果不能使用一句話說出來的技術,不扎實! 進程:一個程式,QQ.exe Music.exe 程式 ...
JUC學習
1.什麼是JUC
java.util 工具包、包、分類
業務:普通的線程代碼 Thread
Runnable 沒有返回值、效率相比入 Callable 相對較低!
2.線程和進程
線程、進程,如果不能使用一句話說出來的技術,不扎實!
進程:一個程式,QQ.exe Music.exe 程式的集合;
一個進程往往可以包含多個線程,至少包含一個!
Java預設有幾個線程? 2 個 mian、GC
線程:開了一個進程 Typora,寫字,自動保存(線程負責的)
對於Java而言:Thread、Runnable、Callable
Java 真的可以開啟線程嗎? 答:開不了
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0()
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// 本地方法,底層的C++ ,Java 無法直接操作硬體
private native void start0();
併發、並行
併發編程:併發、並行
併發(多線程操作同一個資源)
- CPU 一核 ,模擬出來多條線程,天下武功,唯快不破,快速交替
並行(多個人一起行走)
- CPU 多核 ,多個線程可以同時執行; 線程池
public class Test01 {
public static void main(String[] args) {
//獲取cpu的核數
// cpu 密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
併發編程的本質:充分利用CPU的資源
線程有幾個狀態
public enum State {
// 新生
NEW,
// 運行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待,死死地等
WAITING,
// 超時等待
TIMED_WAITING,
// 終止
TERMINATED;
}
wait/sleep 區別
1、來自不同的類
wait => Object
sleep => Thread
2、關於鎖的釋放
wait 會釋放鎖,sleep 睡覺了,抱著鎖睡覺,不會釋放!
3、使用的範圍是不同的
wait必須在同步代碼塊中
sleep可以在任何地方睡
4、是否需要捕獲異常
wait 不需要捕獲異常
sleep 必須要捕獲異常
3.Lock鎖(重點)
傳統 Synchronized
package com.jihu;
// 基本的賣票例子
/**
* 真正的多線程開發,公司中的開發,降低耦合性
* 線程就是一個單獨的資源類,沒有任何附屬的操作!
* 1、 屬性、方法
*/
public class SaleTicketDemo01 {
public static void main(String[] args) {
// 併發:多線程操作同一個資源類, 把資源類丟入線程
Ticket ticket = new Ticket();
// @FunctionalInterface 函數式介面,jdk1.8 lambda表達式 (參數)->{ 代碼 }
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"C").start();
}
}
//資源類 OOP
class Ticket{
//屬性、方法
private int number = 10;
//賣票的方式
//synchronized 本質:隊列,鎖
public synchronized void sale(){
if (number > 0){
System.out.println(Thread.currentThread().getName()+"賣出了"+(number--)+"票,剩餘:"+number);
}
}
}
Lock 介面
公平鎖:十分公平:可以先來後到 非公平鎖:
十分不公平:可以插隊 (預設)
package com.jihu;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicketDemo02 {
public static void main(String[] args) {
// 併發:多線程操作同一個資源類, 把資源類丟入線程
Ticket2 ticket = new Ticket2();
new Thread(()->{ for (int i = 0; i < 20; i++) { ticket.sale();}},"A").start();
new Thread(()->{ for (int i = 0; i < 20; i++) { ticket.sale();}},"B").start();
new Thread(()->{ for (int i = 0; i < 20; i++) { ticket.sale();}},"C").start();
}
}
// Lock三部曲
// 1、 new ReentrantLock();
// 2、 lock.lock(); // 加鎖
// 3、 finally=> lock.unlock(); // 解鎖
class Ticket2{
//屬性、方法
private int number = 30;
Lock lock = new ReentrantLock();
public synchronized void sale(){
lock.lock(); //加鎖
lock.tryLock(); //加入後不會一直等下去,其他線程阻塞的話
try {
// 業務代碼
if (number > 0){
System.out.println(Thread.currentThread().getName()+"賣出了"+(number--)+"票,剩餘:"+number);
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock(); //解鎖
}
}
}
Synchronized 和 Lock 區別
1.Synchronized 內置的java關鍵字,Lock是一個java類
2.Synchronized 無法判斷獲取鎖的狀態,Lock可以判斷是否獲取到了鎖
3.Synchronized 會自動釋放鎖,lock必須手動釋放鎖,如果不釋放鎖,會導致死鎖
4.Synchronized 線程1(獲得鎖,阻塞)、線程2(等待,傻傻的等) ; lock鎖就不一定會等待下去;
5.Synchronized 可重入鎖,不可以中斷的,非公平; Lock,可重入鎖,可以判斷鎖,非公平(可以自己設置)
6.Synchronized 適合鎖少量的代碼同步問題,lock鎖適合鎖大量的同步代碼
4.生產者和消費者問題
面試常考點的:單例模式、排序演算法、生產者和消費者、死鎖
生產者和消費者問題 Synchronized 版
package com.jihu.productor;
/**
* 線程之間的通信問題:生產者和消費者問題! 等待喚醒,通知喚醒
* 線程交替執行 A B 操作同一個變數 num = 0
* A num+1
* B num-1
*/
public class A {
public static void main(String[] args) {
Date date = new Date();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
//可以將生產消費者模式分為 ( 判斷等待,業務,通知 ) 三個步驟
class Date{ //數字 資源類
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
if (number != 0){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他線程,我+1完畢了
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number == 0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他線程,我-1完畢了
this.notify();
}
}
問題存在,A B C D 4 個線程! 虛假喚醒
if 改為 while 判斷
就可以改變此問題的發生
JUC版的生產者和消費者問題
通過Lock 找到 Condition
package com.jihu.productor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class B {
public static void main(String[] args) {
Date2 date = new Date2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
date.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//可以將生產消費者模式分為 ( 判斷等待,業務,通知 ) 三個步驟
class Date2{ //數字 資源類
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//+1
public synchronized void increment() throws InterruptedException {
lock.lock(); //上鎖
try {
while (number != 0){
//等待
condition.await(); //等待
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他線程,我+1完畢了
condition.signalAll(); //喚醒全部
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); //解鎖
}
}
//-1
public synchronized void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0){
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
//通知其他線程,我-1完畢了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
任何一個新的技術,絕對不是僅僅只是覆蓋了原來的技術,優勢和補充!
Condition 精準的通知和喚醒線程
package com.jihu.productor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//A 執行完調用B,B執行完調用C,C執行完調用A
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printC();
}
},"C").start();
}
}
class Data3{ //資源類 lock
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int number = 1; // 1時A執行,2 B執行,3 C執行
public void printA(){
lock.lock();
try {
//業務,判斷-> 執行-> 通知
while (number != 1){
//等待
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAAAAAA");
//喚醒,喚醒指定的人, B
number = 2;
condition2.signal(); //喚醒2
} catch (Exception exception) {
exception.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while (number != 2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>BBBBBBB");
//喚醒 ,喚醒指定的人,C
number = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while (number != 3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"=>CCCCCCC");
//喚醒 ,喚醒指定的人,A
number = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
測試
5. 8鎖現象
如何判斷鎖的是誰!永遠的知道什麼鎖,鎖到底鎖的是誰!
深刻理解我們的鎖
測試1
package com.jihu.lock8;
import java.util.concurrent.TimeUnit;
/*
8鎖:就是關於鎖的8個問題
1、標準情況下,兩個線程先列印 發簡訊還是 打電話? 1/發簡訊 2/打電話
2、sendSms延遲4秒,兩個線程先列印 發簡訊還是 打電話? 1/發簡訊 2/打電話
*/
public class Test01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendSms();
},"A").start();
//捕獲
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone{
//synchronized 鎖的對象是方法的調用者
//兩個方法用的是同一個鎖,誰先拿到誰執行
public synchronized void sendSms() {
// TimeUnit.SECONDS.sleep(4);
System.out.println("發簡訊");
}
public synchronized void call(){
System.out.println("打電話");
}
}
測試2
package com.jihu.lock8;
import java.util.concurrent.TimeUnit;
/*
3. 增加了一個普通方法後 是先執行發簡訊還是hello? 答案是: 普通方法
4. 兩個對象,兩個同步方法,發簡訊還是 打電話? //打電話
*/
public class Test02 {
public static void main(String[] args) {
//兩個對象,兩個調用者,兩把鎖
Phone2 phone = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(()->{
try {
phone.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//捕獲
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone2{
//synchronized 鎖的對象是方法的調用者
public synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("發簡訊");
}
public synchronized void call(){
System.out.println("打電話");
}
//這裡沒有鎖,不是同步方法,不受鎖的影響
public void hello(){
System.out.println("hello");
}
}
測試3
package com.jihu.lock8;
import java.util.concurrent.TimeUnit;
/*
5. 增加兩個靜態的同步方法,只有一個對象,是先列印 發簡訊還是打電話? 答:發簡訊
6.兩個對象! 增加兩個靜態的同步方法,先列印 發簡訊還是打電話? 答:發簡訊
*/
public class Test03 {
public static void main(String[] args) {
//兩個對象的Classlei 模板 只有一個 ,static ,鎖的是Class
Phone3 phone = new Phone3();
Phone3 phone2 = new Phone3();
new Thread(()->{
try {
phone.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//捕獲
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
// phone.call();
phone2.call();
},"B").start();
}
}
//Phone3唯一的一個Class對象
class Phone3{
//synchronized 鎖的對象是方法的調用者
//static 靜態方法
//類一載入就有了 鎖的是Class
public static synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("發簡訊");
}
public static synchronized void call(){
System.out.println("打電話");
}
}
//測試:
發簡訊
打電話
測試4
package com.jihu.lock8;
import java.util.concurrent.TimeUnit;
/*
7. 1個靜態的同步方法,1個普通的同步方法,一個對象,先列印 發簡訊還是打電話? 答:打電話
8. 1個靜態的同步方法,1個普通的同步方法,兩個對象,先列印 發簡訊還是打電話? 答: 打電話
*/
public class Test04 {
public static void main(String[] args) {
//兩個對象的Classlei 模板 只有一個 ,static ,鎖的是Class
Phone4 phone = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(()->{
try {
phone.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//捕獲
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
// phone.call();
phone2.call();
},"B").start();
}
}
class Phone4{
//靜態的同步方法,鎖的是Class類模板
public static synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("發簡訊");
}
//普通的同步方法 鎖的調用者
public synchronized void call(){
System.out.println("打電話");
}
}
//測試:
打電話
發簡訊
小結
new this 具體的一個手機
static Class 唯一的一個模板
6. 集合類不安全
List 不安全
package com.jihu.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
// java.util.ConcurrentModificationException 併發修改異常!
public class ListTest {
public static void main(String[] args) {
// 併發下 ArrayList 不安全的嗎,Synchronized;
/**
* 解決方案:
* 1.List<String> list = new Vector<>();
* 2. List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3.List<String> list = new CopyOnWriteArrayList<>();
*/
//CopyOnWrite 寫入時複製, COW 電腦程式設計領域的一種優化策略
//多個線程調用的時候,list,讀取的時候,固定的,寫入(覆蓋)
//在寫如的時候避免覆蓋,造成數據問題
//讀寫分離
//CopyOnWriteArrayList比Vector NB在哪裡?
// List<String> list = new ArrayList<>(); //會出現ConcurrentModificationException 異常
// List<String> list = new Vector<>();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
/* public static void main(String[] args) {
List<String> list = Arrays.asList("1", "2", "3");
list.forEach(System.out::println);
}*/
}
Set 不安全
package com.jihu.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
//同理可證java.util.ConcurrentModificationException 併發修改異常
/*
解決方法:
1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
2. Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>(); //會導入出現併發修改異常
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 100; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashSet 底層是什麼?
public HashSet() {
map = new HashMap<>();
}
//add set本質就是map key是無法重覆的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object(); //不變的值
Map 不安全
回顧Map基本操作
package com.jihu.unsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//java.util.ConcurrentModificationException 併發修改異常
public class MapTest {
public static void main(String[] args) {
//map是這樣用的嗎? 不是,工作中不用 HashMap
//預設等價於什麼 ? new HashMap<>(16,0.75)
// Map<String, String> map = new HashMap<>(); //出現併發修改異常
//研究ConcurrentHashMap底層原理 在jdk幫助文檔中查看
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
7.Callable(簡單)
1、可以有返回值
2、可以拋出異常
3、方法不同,run()/ call()
package com.jihu.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask()).start();
// new Thread(new FutureTask( Callable )).start();
new Thread().start(); // 怎麼啟動Callable
MyThread myThread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(myThread); //適配類
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); //就會出現一次call,因為結果會被緩存
Integer o = futureTask.get(); //這個get 方法可能會產生阻塞,把他放到最後,或者使用非同步通信來處理
System.out.println(o);
}
}
class MyThread implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("call");
return 1024;
}
}
//展示結果
call
1024
細節:
1、有緩存
2、結果可能需要等待,會阻塞!
8.常用的輔助類(必會)
8.1 CountDownLatch
//CountDownLatch 相當於減法計數器
package com.jihu.add;
import java.util.concurrent.CountDownLatch;
//計數器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//總數是6 , 在一些必須要執行任務的時候再使用。
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" go out");
countDownLatch.countDown(); //數量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); //等待計數器歸零, 然後再向下執行
System.out.println("close door");
}
}
//測試結果
0 go out
1 go out
4 go out
3 go out
5 go out
2 go out
close door
原理:
countDownLatch.countDown();
// 數量-1
countDownLatch.await();
// 等待計數器歸零,然後再向下執行
每次有線程調用 countDown() 數量-1,假設計數器變為0,countDownLatch.await() 就會被喚醒,繼續 執行!
8.2 CyclicBarrier
//CyclicBarrier 想當於加法計數器
package com.jihu.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("集齊7顆龍珠,召喚神龍");
});
for (int i = 1; i <= 7; i++) {
final int temp = i; //在lambda中不能直接使用本地i,作用域不同,所以需要轉為final類型
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"顆龍珠");
try {
cyclicBarrier.await(); //沒收集7顆龍珠則等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
//測試
Thread-5收集了6顆龍珠
Thread-1收集了2顆龍珠
Thread-3收集了4顆龍珠
Thread-6收集了7顆龍珠
Thread-4收集了5顆龍珠
Thread-2收集了3顆龍珠
Thread-0收集了1顆龍珠
集齊7顆龍珠,召喚神龍
8.3 Semaphore
Semaphore:信號量
搶車位!
6車---3個停車位置
package com.jihu.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//線程數量: 停車位 , 限流中都會用到
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//acquire 得到
semaphore.acquire(); //得到
System.out.println(Thread.currentThread().getName()+" 搶到了車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" 離開了車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//release(); 釋放
semaphore.release(); // 釋放車位
}
},String.valueOf(i)).start();
}
}
}
//測試結果
1 搶到了車位
5 搶到了車位
0 搶到了車位
1 離開了車位
5 離開了車位
0 離開了車位
2 搶到了車位
3 搶到了車位
4 搶到了車位
2 離開了車位
3 離開了車位
4 離開了車位
原理:
semaphore.acquire()
獲得,假設如果已經滿了,等待,等待被釋放為止!
semaphore.release();
釋放,會將當前的信號量釋放 + 1,然後喚醒等待的線程!
作用: 多個共用資源互斥的使用!併發限流,控制最大的線程數!
9.讀寫鎖
package com.jihu.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 獨占鎖(寫鎖) 一次只能被一個線程占有
* 共用鎖(讀鎖) 多個線程可以同時占有
* ReadWriteLock
* 讀-讀 可以共存!
* 讀-寫 不能共存!
* 寫-寫 不能共存!
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
// MyCache myCache = new MyCache();
MyCacheLock myCache = new MyCacheLock();
//寫入
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(temp)).start();
}
//讀取
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(temp)).start();
}
}
}
//加鎖的
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//讀寫鎖, 更加細粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,寫入的時候,只希望同時只有一個線程寫
public void put(String key,Object value){
readWriteLock.writeLock().lock();//寫加鎖
try {
System.out.println(Thread.currentThread().getName()+"寫入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"寫入OK");
} catch (Exception exception) {
exception.printStackTrace();
} finally {
readWriteLock.writeLock().unlock(); //寫解鎖
}
}
//取 ,讀時 所有人都可以讀
public void get(String key){
readWriteLock.readLock().lock(); //讀加鎖
try {
System.out.println(Thread.currentThread().getName()+"讀取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"讀取OK");
} catch (Exception exception) {
exception.printStackTrace();
} finally {
readWriteLock.readLock().unlock(); //讀解鎖
}
}
}
//測試結果
/* 4寫入4
4寫入OK
3寫入3
3寫入OK
0讀取0
0讀取OK
1讀取1
1讀取OK*/
//自定義緩存
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
// 存,寫
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"寫入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"寫入OK");
}
// 取,讀
public void get(String key){
System.out.println(Thread.currentThread().getName()+"讀取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"讀取OK");
}
}
//出現的問題
/* 9寫入9
7讀取7
6寫入6
3寫入3
4讀取4
0讀取0
8讀取8
8寫入8*/
10.阻塞隊列
阻塞隊列:
BlockingQueue BlockingQueue 不是新的東西
什麼情況下我們會使用 阻塞隊列:多線程併發處理,線程池!
學會使用隊列
添加、移除
四組API(重要)
方式 | 拋出異常 | 有返回值,不拋出異常 | 阻塞 等待 | 超時等待 |
---|---|---|---|---|
添加 | add | offer() | put() | offer(..) |
移除 | remove | poll() | take() | poll(..) |
檢測隊首元素 | element | peak() | - | - |
public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
// test3();
test4();
}
/*
拋出異常
*/
public static void test1(){
//隊列的大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element()); //查看隊首元素是誰
//java.lang.IllegalStateException: Queue full 跑出異常
// System.out.println(blockingQueue.add("d"));
System.out.println("--------------");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException 拋出異常
// System.out.println(blockingQueue.remove());
}
/*
有返回值,不拋出異常
*/
public static void test2(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a2"));
System.out.println(blockingQueue.offer("b2"));
System.out.println(blockingQueue.offer("c2"));
System.out.println(blockingQueue.peek()); //檢查隊首元素
//blockingQueue.offer("d2") 返回false 不拋出異常
System.out.println(blockingQueue.offer("d2"));
System.out.println("--------------");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//不會拋出異常 返回 null
System.out.println(blockingQueue.poll());
}
/*
阻塞 等待(一直阻塞)
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); //隊列沒有位置了,就會一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take()); //沒有這個元素後,就會一直阻塞
}
/*
阻塞 等待(等待超時)
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// blockingQueue.offer("d",2, TimeUnit.SECONDS); //等待2秒 超時後就會退出
System.out.println("------------");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS); //等待超時2秒後就退出
}
SynchronousQueue 同步隊列
沒有容量,
進去一個元素,必須等待取出來之後,才能再往裡面放一個元素!
put、take
package com.jihu.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步隊列
* 和其他的BlockingQueue 不一樣, SynchronousQueue 不存儲元素
* put了一個元素,必須從裡面先take取出來,否則不能在put進去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
// SynchronousQueue<Object> objects = new SynchronousQueue<>();//同步隊列
BlockingQueue blockingQueue = new SynchronousQueue<>();//同步隊列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put(1);
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put(2);
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
//測試
T1 put 1
T1 put 2
T2=>1
T1 put 3
T2=>2
T2=>3
11. 線程池(重點)
線程池:三大方法、7大參數、4種拒絕策略
池化技術
程式的運行,本質:占用系統的資源! 優化資源的使用!=>池化技術
線程池、連接池、記憶體池、對象池///..... 創建、銷毀。十分浪費資源
池化技術:事先準備好一些資源,有人要用,就來我這裡拿,用完之後還給我。
線程池的好處:
1、降低資源的消耗
2、提高響應的速度
3、方便管理。
線程復用、可以控制最大併發數、管理線程
1.三大方法
線程池:三大方法
package com.jihu.bq;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Executors 工具類、3大方法
public class Demo01 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個線程
ExecutorService threadPool = Executors.newFixedThreadPool(5); //創建一個固定的線程池的大小
// ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸縮的,遇強則強,遇弱則弱
try {
for (int i = 0; i < 10; i++) {
//使用了線程池之後,使用線程池來創建線程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception exception) {
exception.printStackTrace();
} finally {
// 線程池用完,程式結束,關閉線程池
threadPool.shutdown();
}
}
}
newSingleThreadExecutor測試結果:
newFixedThreadPool(5)測試結果:
newCachedThreadPool測試結果:
2、7大參數
7大參數
源碼分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
// 本質ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize, // 核心線程池大小
int maximumPoolSize, // 最大核心線程池大小
long keepAliveTime,// 超時了沒有人調用就會釋放
TimeUnit unit, //超時單位
BlockingQueue<Runnable> workQueue, //阻塞隊列
ThreadFactory threadFactory, //線程工廠:創建線程的,一般不用動。
RejectedExecutionHandler handler //拒絕策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手動創建一個線程池
package com.jihu.bq02;
import java.util.concurrent.*;
/**
* new ThreadPoolExecutor.AbortPolicy() //銀行滿了,還有人進來,不處理這個人的,拋出異常
* new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪裡 main來的就main方法執行
* new ThreadPoolExecutor.DiscardPolicy() //隊列滿了,丟掉任務,不會拋出異常
* new ThreadPoolExecutor.DiscardOldestPolicy() //隊列滿了,嘗試去和最早的競爭,如果最早的快要結束了就可能等會,要不然就會離開,不會拋出異常
*/
public class Demo02 {
public static void main(String[] args) {
//自定義線程池! 工作工需要自己自定義
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); //隊列滿了,嘗試去和最早的競爭,如果最早的快要結束了就可能等會,要不然就會離開,不會拋出異常
try {
// 最大承載:Deque + max
// 超過 最大承載後 拋出: java.util.concurrent.RejectedExecutionException
for (int i = 0; i < 9; i++) {
// 使用了線程池之後,使用線程池來創建線程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();// 線程池用完,程式結束,關閉線程池
}
}
}
3、四種拒絕策略
四種拒絕策略
/**
* new ThreadPoolExecutor.AbortPolicy() //銀行滿了,還有人進來,不處理這個人的,拋出異常
* new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪裡 main來的就main方法執行
* new ThreadPoolExecutor.DiscardPolicy() //隊列滿了,丟掉任務,不會拋出異常
* new ThreadPoolExecutor.DiscardOldestPolicy() //隊列滿了,嘗試去和最早的競爭,如果最早的快要結束了就可能等會,要不然就會離開,不會拋出異常
*/
小結和拓展
池的最大的大小如何去設置!(下麵代碼即是)
瞭解:IO密集型,CPU密集型:(調優用到的)
package com.jihu.bq02;
import java.util.concurrent.*;
public class Demo02 {
public static void main(String[] args) {
//自定義線程池! 工作工需要自己自定義
// 最大線程到底該如何定義
// 1、CPU 密集型,幾核,就是幾,可以保持CPU的效率最高!
// 2、IO 密集型 > 判斷你程式中十分耗IO的線程,
// 程式 15個大型任務 IO十分占用資源!(io密集型要定義大於15才行,才有其他的線程去執行別的任務)
// 獲取CPU的核數
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); //隊列滿了,嘗試去和最早的競爭,如果最早的快要結束了就可能等會,要不然就會離開,不會拋出異常
try {
// 最大承載:Deque + max
// 超過 最大承載後 拋出: java.util.concurrent.RejectedExecutionException
for (int i = 0; i < 9; i++) {
// 使用了線程池之後,使用線程池來創建線程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();// 線程池用完,程式結束,關閉線程池
}
}
}
12、四大函數式介面(必需掌握)
新時代的程式員必會:lambda表達式、鏈式編程、函數式介面、Stream流式計算
函數式介面: 只有一個方法的介面
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//超多 @FunctionalInterface
//簡化編程模型,在新版本的框架底層大量應用
//foreach(消費者類的函數式介面)
//
Function函數式介面
代碼測試
package com.jihu.function;
import java.util.function.Function;
/*
* Function 函數型介面, 有一個輸入參數,有一個輸出
* 只要是 函數型介面 可以 用 lambda表達式簡化
*/
public class Demofunction01 {
public static void main(String[] args) {
// Function<String, String> function = new Function<>(){
//
// @Override
// public String apply(String str) {
// return str;
// }
// };
//(str) 為傳入的參數
//{return str;} 裡面為具體的代碼
Function<String, String> function =(str)->{return str;}; //lambada表達式對上面式子的簡化
System.out.println(function.apply("aaa1"));
}
}
斷定型介面:有一個輸入參數,返回值只能是 布爾值!
package com.jihu.function;
import java.util.function.Predicate;
/*
斷定型介面:有一個輸入參數,返回值只能是 布爾值!
*/
public class Demofunction02 {
public static void main(String[] args) {
//判斷字元串是否為空
/* Predicate<String> predicate = new Predicate<>(){
@Override
public boolean test(String str) {
return str.isEmpty();
}
};*/
Predicate<String> predicate =(str)->{return str.isEmpty();};
System.out.println(predicate.test("1as")); //返回false
// System.out.println(predicate.test("")); //返回true
}
}
Consumer 消費型介面
package com.jihu.function;
import java.util.function.Consumer;
/**
* Consumer 消費型介面: 只有輸入,沒有返回值
*/
public class Demofunction03 {
public static void main(String[] args) {
/* Consumer<String> consumer = new Consumer<>() {
@Override
public void accept(String str) {
System.out.println(str);
}
};*/
Consumer<String> consumer =(str)->{
System.out.println(str);
};
consumer.accept("asad");
}
}
Supplier 供給型介面
package com.jihu.function;
import java.util.function.Supplier;
/**
* Supplier 供給型介面 沒有參數,只有返回值
*/
public class Demofunction04 {
public static void main(String[] args) {
/*Supplier<String> supplier = new Supplier<>(){
@Override
public String get() {
return "stras";
}
};*/
Supplier<String> supplier =()->{return "adas";};
System.out.println(supplier.get());
}
}
13、Stream流式運算
什麼是Stream流式運算
大數據:存儲 + 計算
集合、MySQL 本質就是存儲東西的;
計算都應該交給流來操作!
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private Integer id;
private String name;
private Integer age;
}
package com.jihu.stream;
import java.util.Arrays;
import java.util.List;
/**
* 題目要求:一分鐘內完成此題,只能用一行代碼實現!
* 現在有5個用戶!篩選:
* 1、ID 必須是偶數
* 2、年齡必須大於23歲
* 3、用戶名轉為大寫字母
* 4、用戶名字母倒著排序
* 5、只輸出一個用戶!
*/
public class Trst {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(5, "e", 25);
User u6 = new User(6, "f", 26);
//集合就是存儲 把上述user存入集合中
List<User> list = Arrays.asList(u1, u2, u3, u4, u5,u6);
//計算交給stream流
// lambda表達式、下麵叫 鏈式編程、函數式介面、Stream流式計算
list.stream()
.filter(u->{return u.getId()%2==0;}) //ID 必須是偶數
.filter(u->{return u.getAge()>23;}) //年齡必須大於23歲
.map(u->{return u.getName().toUpperCase();}) //用戶名轉為大寫字母 D F
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);}) // 用戶名字母倒著排序 F D
.limit(1) // 只輸出一個用戶! F
.forEach(System.out::println);
}
}
14、ForkJoin
什麼是 ForkJoin
ForkJoin 在 JDK 1.7 , 並行執行任務!提高效率。大數據量 才使用它!
大數據:Map Reduce (把大任務拆分為小任務)
ForkJoin 特點:工作竊取
這個裡面維護的都是雙端隊列(B執行完後開始執行A的任務,這叫工作竊取,但可能會出現同時都搶一個資源的現象。)
ForkJoin
package com.jihu.forkjoin;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 求和計算的任務!
* 3000 6000(ForkJoin) 9000(Stream並行流)
* // 如何使用 forkjoin
* // 1、forkjoinPool 通過它來執行
* // 2、計算任務 forkjoinPool.execute(ForkJoinTask task)
* // 3. 計算類要繼承 ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//臨界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//計算方法
@Override
protected Long compute() {
if ((end-start) < temp){
Long sum = 0L;
for (Long i=start;i<=end;i++){
sum+=i;
}
return sum;
}else { //forkjoin 遞歸
long middle = (end+start)/2; //中間值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); //拆分任務,把任務壓入線程隊列
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork();//拆分任務,把任務壓入線程隊列
return task1.join()+task2.join();
}
}
}
測試
package com.jihu.forkjoin;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); //11583
// test2(); //9751
test3(); //899
}
// 普通程式員
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <10_0000_0000 ; i++) {
sum+=i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"時間:"+(end-start));
}
// 會使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask task = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask submit = forkJoinPool.submit(task); //提交任務
Long sum = (Long) submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"時間:"+(end-start));
}
public static void test3(){
long start = System.currentTimeMillis();
//Stream並行流 parallel:表示並行
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"時間:"+(end-start));
}
}
15、非同步回調
Future 設計的初衷: 對將來的某個事件的結果進行建模
package com.jihu.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 非同步調用: CompletableFuture
* // 非同步執行
* // 成功回調
* // 失敗回調
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//沒有返回值的 runAsync 非同步回調
//Void跟void不同,Void是包裝類,和Integer類似
/* CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
});
System.out.println("1111");
completableFuture.get(); // 獲取阻塞執行結果 */
//輸出結果為
// 1111
// ForkJoinPool.commonPool-worker-3runAsync=>Void
//有返回值的 supplyAsync 非同步回調
// ajax,成功和失敗的回調
// 返回的是錯誤信息;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {