編程問題中相當大的一部分都可以通過使用順序編程來解決。 對於某些問題,如果能夠並行地執行程式中的多個部分,則會變得非常方便。 並行編程可以使程式執行速度得到極大地提高。 當並行執行的任務彼此開始產生互相干涉時,實際的併發問題就會接踵而至。 Web伺服器經常包含多個處理器,而併發是充分利用這些處理器的 ...
編程問題中相當大的一部分都可以通過使用順序編程來解決。
對於某些問題,如果能夠並行地執行程式中的多個部分,則會變得非常方便。
並行編程可以使程式執行速度得到極大地提高。
當並行執行的任務彼此開始產生互相干涉時,實際的併發問題就會接踵而至。
Web伺服器經常包含多個處理器,而併發是充分利用這些處理器的理想方式。
1.基本的線程機制
併發編程使我們可以將程式劃分為多個分離的、獨立運行的任務。
通過使用多線程機制,這些獨立任務中的每一個都將由執行線程來驅動。
一個線程就是在進程中的一個單一的順序控制流。
單個進程可以擁有多個併發執行的任務,但是你的程式使得每個任務都好像有自己的CPU一樣。其底層機制是切分CPU時間,但通常你不需要考慮它。
在使用線程時,CPU將輪流給每個任務分配其占用時間,每個任務都覺得自己在一直占用CPU,但事實上CPU時間是劃分成片段分配給了所有的任務。
多任務和多線程往往是使用多處理器系統的最合理方式。
1.1 定義任務
線程可以驅動任務,你需要一種描述任務的方式,這可以由Runnable
介面來提供。
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
任務的run()
方法通常總會有某種形式的迴圈,使得任務一直運行下去直到不再需要。
通常,run()
被寫成無限迴圈的形式,這就意味著,除非有某個條件使得run()
終止,否則它將永遠運行下去。
當從Runnable
導出一個類時,它必須具有run()
方法,但是這個方法並無特殊之處——它不會產生任何內在的線程能力。要實現線程的行為,你必須顯式地將一個任務附著到線程上。
1.2 Thread類
將Runnable對象轉變為工作任務的傳統方式是把它提交給一個Thread構造器。
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
調用Thread對象的start()
方法為該線程執行必需的初始化操作,然後調用Runnable的run()
方法,以便在這個新線程中啟動任務。
你可以很容易地添加更多的線程去驅動更多的任務。
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
}
這個程式一次運行的結果可能與另一次運行的結果不同,因為線程調度機制是非確定性的。
1.3 繼承Thread類
在非常簡單的情況下,你可以直接繼承Thread類來代替實現Runnable介面的方式。
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
// Store the thread name:
super(Integer.toString(++threadCount));
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
}
}
實現介面使得你可以繼承另一個不同的類,而從Tread繼承則不行。
1.4 優先順序
Java線程有優先順序,優先順序高的線程會獲得較多的運行機會。
Java線程的優先順序用整數表示,取值範圍是1~10,Thread類有以下三個靜態常量:
public class Thread implements Runnable {
//...
/**
* The minimum priority that a thread can have.
*/
public final static int MIN_PRIORITY = 1;//線程的最低優先順序
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;//線程的預設優先順序
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;//線程的最高優先順序
//...
public final void setPriority(int newPriority) {
ThreadGroup g;
checkAccess();
if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
throw new IllegalArgumentException();
}
if((g = getThreadGroup()) != null) {
if (newPriority > g.getMaxPriority()) {
newPriority = g.getMaxPriority();
}
setPriority0(priority = newPriority);
}
}
public final int getPriority() {
return priority;
}
//...
}
Thread類的setPriority()
和getPriority()
方法分別用來設置和獲取線程的優先順序。
1.5 線程的狀態
一個線程可以處於以下四種狀態之一:
- 新建(new):當線程被創建時,它只會短暫地處於這種狀態。
- 就緒(Runnable):在這種狀態下,只要調度器把時間片分配給線程,線程就可以運行。
- 阻塞(Blocked):當線程處於阻塞狀態時,調度器將忽略線程,不會分配給線程任何CPU時間。直到線程重新進入了就緒狀態,它才有可能執行操作。
- 死亡(Dead):處於死亡或終止狀態的線程將不再是可調度的,並且再也不會得到CPU時間,它的任務已經結束,或不再是可運行的(任務死亡的通常方式是從run()方法返回)。
一個任務進入阻塞狀態,可能有如下原因:
- 通過調用
sleep(milliseconds)
使任務進入休眠狀態,在這種情況下,任務在指定時間內不會運行。 - 你通過調用
wait()
使線程掛起。直到線程得到了notify()
或notifyAll()
消息,線程才會進入就緒狀態。 - 任務在等待某個輸入/輸出完成。
- 任務試圖在某個對象上調用其同步控制方法,但是對象鎖不可用,因為另一個任務已經獲取了這個鎖。
2.解決共用資源競爭
使用線程時的一個基本問題:你永遠都不知道一個線程何時在運行。
對於併發工作,你需要某種方式來防止兩個任務訪問相同的資源。
防止這種衝突的方法就是當資源被一個任務使用時,在其上加鎖。
第一個訪問某項資源的任務必須鎖定這項資源,使其他任務在其被解鎖之前,就無法訪問它了。而在其被解鎖之時,另一個任務就可以鎖定並使用它。
Java以提供關鍵字synchronized的形式,為防止資源衝突提供內置支持。當任務要執行被synchronized關鍵字保護的代碼片段的時候,它將檢查鎖是否可用,然後獲取鎖,執行代碼,釋放鎖。
共用資源一般是以對象形式存在的記憶體片段,但也可以是文件、輸入/輸出埠,或者是印表機。
要控制對共用資源的訪問,得先把它包裝進一個對象。然後把所有要訪問這個資源的方法標記為synchronized。
當在對象上調用其任意synchronized方法的時候,此對象都被加鎖。
這時該對象上的其他synchronized方法只有等到前一個方法調用完畢並釋放了鎖之後才能被調用。
一個任務可以多次獲得對象的鎖。
如果一個方法在同一個對象上調用了第二個方法,後者又調用了同一對象上的另一個方法,就會發生這種情況。
JVM負責跟蹤對象被加鎖的次數。
如果一個對象被解鎖,其計數變為0。在任務第一次給對象加鎖的時候,計數變為1。每當這個相同的任務在這個對象上獲得鎖時,計數都會遞增。
只有首先獲得了鎖的任務才能允許繼續獲取多個鎖。
每當任務離開一個synchronized方法,計數遞減,當計數為零的時候,鎖被完全釋放,此時別的任務就可以使用此資源。
3.JUC(java.util.concurrent)
3.1 volatile
如果多個任務在同時訪問某個域,那麼這個域就應該是volatile的,否則,這個域就應該只能經由同步來訪問。
如果一個域完全由synchronized方法或語句塊來防護,那就不必將其設置為volatile的。
/*
* 一、volatile 關鍵字:當多個線程進行操作共用數據時,可以保證記憶體中的數據可見。
* 相較於 synchronized 是一種較為輕量級的同步策略。
*
* 註意:
* 1. volatile 不具備“互斥性”
* 2. volatile 不能保證變數的“原子性”
*/
public class TestVolatile {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("------------------");
break;
}
}
}
}
class ThreadDemo implements Runnable {
private volatile boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
flag = true;
System.out.println("flag=" + isFlag());
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
3.2 原子類
Java SE5引入了諸如AtomicInteger
、AtomicLong
、AtomicReference
等特殊的原子性變數類。
這些類被調整為可以使用在某些現代處理器上的可獲得的原子性。對於常規編程來說,它們很少會派上用場,但是在涉及性能調優時,它們就有大有用武之地了。
public class TestAtomicDemo {
public static void main(String[] args) {
AtomicDemo ad = new AtomicDemo();
for (int i = 0; i < 10; i++) {
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable{
// private volatile int serialNumber = 0;
private AtomicInteger serialNumber = new AtomicInteger(0);
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(getSerialNumber());
}
public int getSerialNumber(){
return serialNumber.getAndIncrement();
}
}
3.3 線程池
使用Executor
/*
* 一、線程池:提供了一個線程隊列,隊列中保存著所有等待狀態的線程。避免了創建與銷毀額外開銷,提高了響應的速度。
*
* 二、線程池的體繫結構:
* java.util.concurrent.Executor : 負責線程的使用與調度的根介面
* |--**ExecutorService 子介面: 線程池的主要介面
* |--ThreadPoolExecutor 線程池的實現類
* |--ScheduledExecutorService 子介面:負責線程的調度
* |--ScheduledThreadPoolExecutor :繼承 ThreadPoolExecutor, 實現 ScheduledExecutorService
*
* 三、工具類 : Executors
* ExecutorService newFixedThreadPool() : 創建固定大小的線程池
* ExecutorService newCachedThreadPool() : 緩存線程池,線程池的數量不固定,可以根據需求自動的更改數量。
* ExecutorService newSingleThreadExecutor() : 創建單個線程池。線程池中只有一個線程
*
* ScheduledExecutorService newScheduledThreadPool() : 創建固定大小的線程,可以延遲或定時的執行任務。
*/
public class TestThreadPool {
public static void main(String[] args) throws Exception {
//1. 創建線程池
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadPoolDemo tpd = new ThreadPoolDemo();
//2. 為線程池中的線程分配任務
for (int i = 0; i < 10; i++) {
pool.submit(tpd);
}
//3. 關閉線程池
pool.shutdown();
}
}
class ThreadPoolDemo implements Runnable{
private int i = 0;
@Override
public void run() {
while(i <= 100){
System.out.println(Thread.currentThread().getName() + " : " + i++);
}
}
}
從任務中產生返回值
如果你希望任務在完成時能夠返回一個值,那麼可以實現Callable介面而不是Runnable介面。
在Java SE5中引入的Callable是一種具有類型參數的泛型,它的類型參數表示的是從方法call()
中返回的值,並且必須使用ExecutorService.submit()
方法調用它。
public class TestThreadPool {
public static void main(String[] args) throws Exception {
//1. 創建線程池
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> future = pool.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
});
list.add(future);
}
pool.shutdown();
for (Future<Integer> future : list) {
System.out.println(future.get());
}
}
}
class ThreadPoolDemo implements Runnable{
private int i = 0;
@Override
public void run() {
while(i <= 100){
System.out.println(Thread.currentThread().getName() + " : " + i++);
}
}
}
3.4 同步鎖
public class TestLock {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket, "1號視窗").start();
new Thread(ticket, "2號視窗").start();
new Thread(ticket, "3號視窗").start();
}
}
class Ticket implements Runnable{
private int tick = 100;
private Lock lock = new ReentrantLock();
@Override
public void run() {
while(true){
lock.lock(); //上鎖
try{
if(tick > 0){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " 完成售票,餘票為:" + --tick);
}
}finally{
lock.unlock(); //釋放鎖
}
}
}
}
等待喚醒機制(註意避免虛假喚醒問題):
/*
* 生產者和消費者案例
*/
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor pro = new Productor(clerk);
Consumer cus = new Consumer(clerk);
new Thread(pro, "生產者 A").start();
new Thread(cus, "消費者 B").start();
// new Thread(pro, "生產者 C").start();
// new Thread(cus, "消費者 D").start();
}
}
//店員
class Clerk{
private int product = 0;
//進貨
public synchronized void get(){
while(product >= 1){//為了避免虛假喚醒問題,應該總是使用在迴圈中
System.out.println("產品已滿!");
try {
this.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : " + ++product);
this.notifyAll();
}
//賣貨
public synchronized void sale(){
while(product <= 0){
System.out.println("缺貨!");
try {
this.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : " + --product);
this.notifyAll();
}
}
//生產者
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
clerk.get();
}
}
}
//消費者
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}
3.5 生產者與消費者
使用互斥並允許任務掛起的基本類是Condition,你可以通過在Condition上調用await()
來掛起一個任務。
當外部條件發生變化,意味著某個任務應該繼續執行時,你可以通過調用signal()
來通知這個任務,從而喚醒一個任務,或者調用signalAll()
來喚醒所有在這個Condition上被其自身掛起的任務。
public class TestProductorAndConsumerForLock {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor pro = new Productor(clerk);
Consumer con = new Consumer(clerk);
new Thread(pro, "生產者 A").start();
new Thread(con, "消費者 B").start();
// new Thread(pro, "生產者 C").start();
// new Thread(con, "消費者 D").start();
}
}
class Clerk {
private int product = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// 進貨
public void get() {
lock.lock();
try {
if (product >= 1) { // 為了避免虛假喚醒,應該總是使用在迴圈中。
System.out.println("產品已滿!");
try {
condition.await();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : "
+ ++product);
condition.signalAll();
} finally {
lock.unlock();
}
}
// 賣貨
public void sale() {
lock.lock();
try {
if (product <= 0) {
System.out.println("缺貨!");
try {
condition.await();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : "
+ --product);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
// 生產者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消費者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}