一、任務的取消於關閉 1、中斷Thread 1.每個線程都有一個boolean類型的中斷狀態。true則是中斷狀態中 interrupt:發出中斷請求;isInterrupt:返回中斷狀態;interrupted:清除中斷狀態 2.JVM中的阻塞方法會檢查線程中斷狀態,其響應方法為:清除中斷狀態,拋 ...
一、任務的取消於關閉
1、中斷Thread
1.每個線程都有一個boolean類型的中斷狀態。true則是中斷狀態中
interrupt:發出中斷請求;isInterrupt:返回中斷狀態;interrupted:清除中斷狀態
2.JVM中的阻塞方法會檢查線程中斷狀態,其響應方法為:清除中斷狀態,拋出InterruptedException異常,表示阻塞操作被中斷結束 ;但JVM不保證阻塞方法何時檢測到線程的中斷狀態
3.中斷的理解:不會真正的中斷一個正在運行的線程,而只是發出請求,具體的中斷由任務自己處理
通過中斷來取消線程通常是最好的方法
public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { /* Allow thread to exit */ //如果捕獲到中斷異常,則由線程自己退出 } } public void cancel() { interrupt(); } }
2、不可中斷的阻塞的中斷
如:Socket I/O操作,即使設置了中斷請求,也不會中斷,但是close 套接字,會使其拋出異常,達到中斷效果;因此我們要重寫中斷方法
//自定義callable實現類 public abstract class SocketUsingTask <T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } //取消方法 public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } //新建實例的方法 public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } } //自定義callable介面 interface CancellableTask <T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } //自定義 執行池 class CancellingExecutor extends ThreadPoolExecutor { ...... //通過改寫newTaskFor 返回自己的Callable protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super.newTaskFor(callable); } }
3、通過自定義取消計時任務
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1); /** * * @param r 任務 * @param timeout 超時時間 * @param unit TimeUnit * @throws InterruptedException */ public static void timedRun(final Runnable r,long timeout, TimeUnit unit) throws InterruptedException { class RethrowableTask implements Runnable { //通過一個volatile變數,來存儲線程是否異常 private volatile Throwable t; public void run() { try { r.run(); } catch (Throwable t) { this.t = t; } } private void rethrow() { if (t != null) throw launderThrowable(t); } } RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); //延時timeout個unit單位後 執行線程中斷 cancelExec.schedule(() -> taskThread.interrupt(), timeout, unit); //無論如何都等待;如果線程不響應中斷,那麼通過join等待任務線程timeout時間後 不再等待,回到調用者線程 taskThread.join(unit.toMillis(timeout)); //如果 任務線程中有異常,則拋出 task.rethrow(); }
註意:依賴於join,任務超時join退出 和 任務正常join推出 無法進行判斷
4、通過Futrue來實現取消計時任務
private static final ExecutorService taskExec = Executors.newCachedThreadPool(); public static void timedRun(Runnable r,long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { //通過Futrue.get(超時時間),捕獲相應的異常來處理計時運行和取消任務 task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrown in task; rethrow throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true); // interrupt if running } }
二、停止基於線程的服務
1.通常,服務不能直接中斷,造成服務數據丟失
2.線程池服務也不能直接中斷
1、日誌服務
標準的生產者,消費者模式
public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutdown; private int reservations; public LogService(Writer writer) { this.queue = new LinkedBlockingQueue<String>(); this.loggerThread = new LoggerThread(); this.writer = new PrintWriter(writer); } public void start() { loggerThread.start(); } public void stop() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); //發出中斷 } public void log(String msg) throws InterruptedException { synchronized (this) { if (isShutdown){ throw new IllegalStateException(/*...*/); } ++reservations; //保存的正確的在隊列中的日誌數量 } queue.put(msg); //將日誌放入隊列 } private class LoggerThread extends Thread { public void run() { try { while (true) { try { synchronized (LogService.this) { if (isShutdown && reservations == 0) { break; } } String msg = queue.take(); synchronized (LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { /* retry */ //捕獲了中斷請求,但為了將剩餘日誌輸出,不做處理,直到計數器 == 0時,關閉 } } } finally { writer.close(); } } } }
2、ExecutorService中斷
shutDown和shutDownNow
通常,將ExecetorService封裝;如LogService,使其具有自己的生命周期方法
shutDownNow的局限性:不知道當前池中的線程狀態,返回未開始的任務,但不能返回已開始未結束的任務
public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor() { exec = Executors.newSingleThreadExecutor(); } /*public TrackingExecutor(ExecutorService exec) { this.exec = exec; }*/ public void shutdown() { exec.shutdown(); } public List<Runnable> shutdownNow() { return exec.shutdownNow(); } public boolean isShutdown() { return exec.isShutdown(); } public boolean isTerminated() { return exec.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout, unit); } public List<Runnable> getCancelledTasks() { if (!exec.isTerminated()) throw new IllegalStateException(/*...*/); return new ArrayList<Runnable>(tasksCancelledAtShutdown); } public void execute(final Runnable runnable) { exec.execute(new Runnable() { public void run() { try { runnable.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) tasksCancelledAtShutdown.add(runnable); } } }); } @Test public void test() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); TrackingExecutor trackingExecutor = new TrackingExecutor(); trackingExecutor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); System.err.println("123123"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); //設置狀態 或繼續拋,在execute中處理 e.printStackTrace(); } finally { } } }); List<Runnable> runnables = trackingExecutor.shutdownNow(); trackingExecutor.awaitTermination(10,TimeUnit.SECONDS); List<Runnable> cancelledTasks = trackingExecutor.getCancelledTasks(); System.err.println(cancelledTasks.size()); } }
三、處理非正常線程終止
1.未捕獲的Exception導致的線程終止
1.手動處理未捕獲的異常
2.通過Thread的API UncaughExceptionHandler,能檢測出某個線程又遇見未捕獲而導致異常終止
註意:預設是將異常的的堆棧信息 輸出到控制台;自定義的Handler:implements Thread.UncaughExceptionHandler覆寫方法
可以為每個線程設置,也可以設置一個全局的ThreadGroup
Thread.setUncaughtExceptionHandler/Thread.setDefaultUncaughtExceptionHandler
2.JVM退出、守護線程等