結構化併發應用程式-Executor框架的理解與使用

来源:https://www.cnblogs.com/mingoXon/archive/2023/03/18/16993796.html
-Advertisement-
Play Games

前言 在我們實際工作過程中,往往會將大的任務劃分成幾個小的子任務,待所有子任務完成之後,再整合出大任務的結果.(例如: 新增直播課的場景),任務的性質通常是多種多樣的,這裡列舉一些任務的常見性質. 從資源使用的角度: CPU密集型 (枚舉素數) I/O密集型 (文件上傳下載) 從執行過程的角度: 依 ...


目錄

前言

在我們實際工作過程中,往往會將大的任務劃分成幾個小的子任務,待所有子任務完成之後,再整合出大任務的結果.(例如: 新增直播課的場景),任務的性質通常是多種多樣的,這裡列舉一些任務的常見性質.

從資源使用的角度:

  • CPU密集型 (枚舉素數)
  • I/O密集型 (文件上傳下載)

從執行過程的角度:

  • 依賴其他有限資源(資料庫連接池,文件描述符)/不依賴其他有限資源
  • 沒有返回值(寫日誌,logService,MesageService)
  • 有返回值(計算結果)
  • 處理過程中可能拋異常(異常要如何處理)
  • 可取消的任務/不可取消的任務

從執行時間的角度:

  • 執行時間短(枚舉100以內的素數)
  • 執行時間長(資料庫調用)
  • 永遠無法結束(爬蟲任務)
  • 限時任務,需要儘快響應(H5端介面,GUI點擊事件)
  • 定時任務(Job)

任務是對現實問題的抽象,其對應程式中的某些方法,而方法的執行需要調用棧.

從Java記憶體模型圖中可以看出,Java線程為任務的執行提供了所需的方法調用棧,其中包括本地方法調用棧和Java方法調用棧,在32位系統中通常占0.5M,而在64位系統中通常占1M+10幾KB的內核數據結構.

而且有的操作系統也會對一個進程能創建的線程數量進行限制. 因此我們並不能無限制的創建線程,線程是一種共用資源,需要統一維護和調度,以便使資源利用率更加高效,這便有了線程池的概念.

Java記憶體模型圖:

註: 很多服務端的應用程式比如MySQL,Web伺服器都使用了線程池技術.
然而也有例外,對於耗時較短的任務,比如僅有記憶體操作,導致線程的維護時間/任務的執行時間比值偏大,這類任務就不適合使用多線程技術,例如Redis服務.
或者對於需要確保線程安全性的,比如GUI開發包,也是使用單線程,例如Swing開發包,JavaScript等.

對於任務的執行,我們往往會有許多需求,例如觀察任務的執行狀態,獲取任務的執行結果或者執行過程中拋出的異常信息,而對於長時間執行的任務,可能還會需要暫停任務,取消任務,限時等

jdk中有許多執行時間長的任務,例如,Thread.join, Object.wait, BlockingQueue.poll,Future.get,這些任務的介面設計也體現了這些需求例如lock介面:

public void lock(); //基於狀態的介面,當無法獲取鎖時,掛起當前線程
public void lockInterruptibly() throws InterruptedException; // 通過拋出中斷異常來響應中斷
public boolean tryLock();//用於快速判斷釋放可加鎖,用於輪詢
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; //可限時的操作

下麵列舉一些任務的執行需要考慮的問題.
任務的執行策略:

  • 任務在什麼線程中執行
  • 任務的執行順序,FIFO還是按優先順序
  • 多少個任務可以併發執行
  • 任務過多導致系統過載,選擇拒絕哪個任務,即任務的拒絕策略
  • 如何通知應用程式有任務被拒絕
  • 如何通知應用程式任務的執行結果,包括成功的結果和失敗的結果

為了應對這些繁雜的現實需求,jdk為我們提供了Executor框架.通過這個中間人,將任務的提交和實際執行策略解耦,並且提供了對生命周期的支持(ExecutorService),客戶端只需要關註任務的構建和任務的提交,由中間人來關註實際的執行策略。從而封裝了任務執行的複雜性。

本文主要介紹Java平臺提供的Executor執行框架,其中Runable表示任務,FutureTask表示任務的執行結果,ThreadPoolExecutor表示具體的任務執行策略.

任務的描述

Executor框架中,Runable介面表示任務,但是這個任務沒有返回值且不能拋出異常,而Callable介面卻可以.所以Executors工具類提供了RunableAdapter適配器,通過callalbe(Runable)方法,將 runable轉為 callable.

任務描述
Runnable可執行的任務,無返回值,不可拋出異常
Callable可執行的任務,有返回值,可以拋出異常
FutureTask可執行的任務,可以管理任務的執行狀態和讀取任務的執行結果

為了對任務維護任務的運行狀態以及非同步獲取任務的運行結果,Executor框架提供了Future類,該類表示一個非同步任務的計算結果.提供了一些方法:

  • 獲取任務執行結果 get(), get(long,TimeUnit)
  • 取消任務 cancel(boolean)
  • 判斷任務是否取消,判斷任務是否完成 isCancelled(),isDone()
    同時也提供了記憶體一致性保證: Future.get()之前的操作, happen-before Future.get()之後的操作

FutureTask實現了Future介面和Runable介面,表示一個可取消的任務,AbstractExecutorService正是通過將Runable封裝成FutureTask,來管理和維護任務的狀態以及獲取任務的執行結果,下麵介紹jdk1.8中FutureTask的實現.

AbstractExecutorService:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

FutureTask的設計與實現

在jdk1.8之前的版本中為了簡介,依賴AQS來實現FutureTask,但在jdk1.8後,則放棄使用,通過WaitNode鏈表,來維護等待結果的線程.

FutureTask源碼
public class FutureTask<V> implements RunnableFuture<V> {
    /*
     * FutureTask是一個併發安全類,有併發控制機制
     * 先前版本為了簡潔,使用AQS來實現,jdk1.8後的併發控制使用 一個state 域,通過CAS操作state,同時通過一個簡單的stack來維護 waiting threads*/
    private volatile int state;
    // 實際的任務
    private Callable<V> callable;

    // 任務的執行結果或者拋出的異常,通過get()獲取這個欄位
    // 不需要 volatile,不會有併發讀寫該欄位的情況
    private Object outcome;
    // 在run()的時候 會set runner
    private volatile Thread runner;

    // 一個簡單的等待隊列,為什麼不用AQS了? AQS太重了
    private volatile WaitNode waiters;
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }
}

FutureTask狀態機

瞭解了一個類的狀態機,也就大致瞭解了類的工作過程,FutureTask的狀態機如下所示
image

狀態描述
NEW初始任務狀態
COMPLETING任務已執行完,正在設置outcome
NORMAL任務正常執行完成
EXCEPTIONAL任務執行過程中拋出異常,工作線程終止
INTERRUPTED執行任務的工作線程收到中斷請求

思考一個問題,為什麼要有一個COMLETING中間態?

為了維護複合操作的原子性:設置outcome的值和更新任務狀態需要原子操作
 protected void set(V v) {
        // 通過CAS先check下,確保狀態轉換是原子op,同時也確保outcome=v 和設置狀態的值這一對複合操作的原子性
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 進來後就是單線程環境了
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

FutureTask幾個關鍵方法

run(),cancel(),awaiDone()
public void run() {
        // 先驗條件
        if (state != NEW ||
        // set runner
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
 
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 任務執行出錯
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    // 執行成功 set outcome
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // 確保不會丟失中斷信號
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
 
 public boolean cancel(boolean mayInterruptIfRunning) {
        // cancel: new -> interrupting->interrupted,或者 new -> cancelled
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            // check then action, 複合操作, 如果失敗則說明此時任務的狀態不是 new了,返回false,即取消失敗
            return false;
        try { // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    // 通過給 執行該任務的線程 發送中斷信號來取消任務
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    // 發送完後預設置為 interrupted, 表示 信號已發過去了,但任務不一定能停下來,需要任務自己判斷這個信號
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
 
 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 任務未完成前,通過LockSupport.park等待任務完成
            s = awaitDone(false, 0L);
        return report(s);
    }
 
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }
 
/**
     * 可中斷的方法
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        // 可中斷方法的大部分實現,都是通過拋InterruptedException()來響應中斷,註意Thread.interrupted()會清除中斷信號
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
 
            int s = state;
            // 如果任務 達到了終態,即isDone()了, 即 S>COMPLETIOG,返回 isDone() => s>competing
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
                // 正在寫結果, 馬上就結束了
            } else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                // 任務還未開始, 即 s=NEW 時,此時創建 等待線程節點,再過一次前面的操作 到下一步
                q = new WaitNode();
            else if (!queued)
                // 新增的節點未入隊,將節點入隊,入隊成功後再過一次前面的操作 到下一步
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {
                // 如果過了等待時間了,不等了, 把前面構建的節點從等待隊列中刪除,返回 state
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 否則 限時阻塞當前線程,等待任務完成時被喚醒
                LockSupport.parkNanos(this, nanos);
            } else
                // timed=false時, 永遠阻塞當前線程,等待任務完成時被喚醒
                LockSupport.park(this);
        }
    }

ThreadPoolExecutor的設計與實現

簡介

ThreadPoolExecutor = ThreadPool + Executor

Executor: 其中僅有一個execute(Runable) 方法,工作模式為生產者-消費者模式,提交(submit)任務的客戶端相當於生成者,執行任務的線程(worker)則相當於消費者。

ThreadPool: 從字面意思來看是一個線程的容器,用於管理和維護工作者線程。線程池的工作與任務隊列(work queue)密切相關的,其中任務隊列保存了所有等待執行的任務。
ThreadPoolExecutor即以生產者-消費者為工作模型,基於線程池實現的執行器。

由簡介我們可以瞭解到,一般線程池的實現涉及兩個關鍵組件:work queue,workers。而在ThreadPoolExecutor的設計和實現中,其分別對應BlockingQueue介面,Worker類。

線程池的實現所需關鍵組件ThreadPoolExecutor
work queueBlockingQueue
workerfinal class Worker extends AbstractQueuedSynchronizer implements Runnable{}

接下來將從四個方面入手,介紹ThreadPoolExecutor的設計與實現,體會大師們(Doug Lea與JCP Expert Group)是如何思考和解決線程池問題的。

  • 類的結構與狀態
  • 任務的提交與調度
  • 線程的創建與執行
  • 服務的關閉,任務的取消,線程的回收

類的描述與狀態

在開始介紹ThreadPoolExecutor前,有必要先瞭解下Executor框架的其他組成部分。

Executor介面:框架的核心介面,其中只包含一個execute方法

public interface Executor {

   /** 
     * @throws RejectedExecutionException if this task cannot be accepted for execution   
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService介面:線程池作為一個服務需要有服務的狀態維護等操作,這些操作被放到了這個介面,例如shutdown(),shutdownNow(),awaitTermination(),這裡也給出了服務關閉方法。

public static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }

AbstractExecutorService類: 提供ExecutorService的基本實現。例如:通過將任務封裝成FutureTask,實現submit方法。任務的批量執行方法:invokeAll,invokeAny的通用實現等。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /** 註意這裡已經說明瞭該介面可能會拋出此異常,但我們常常會忘記處理此異常而導致報錯,但我們卻記得NPE的處理
     * @throws RejectedExecutionException 
     * @throws NullPointerException       
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

ThreadPoolExecutor欄位描述

ThreadPoolExecutor欄位 ``` public class ThreadPoolExecutor extends AbstractExecutorService { /** * ctl(the main pool control state):用於維護了以下兩個欄位的值 * workCount: 存活著的線程數 低29位,大概5億 * runState: 高3位 線程池服務的狀態: RUNNING(-1),SHUTDOWN(0),STOP(1),TIDYING(2),TERMINATED(3) */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //如何快速獲取n個1? [(1 << n) - 1],CPACITY=29個1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 低29位 private static int workerCountOf(int c) { return c & CAPACITY; } // rs + wc => rs | wc; 加法換成位運算,更快一點 :) private static int ctlOf(int rs, int wc) { return rs | wc; } private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile boolean allowCoreThreadTimeOut; /** * idle thread的定義: waiting for work * Timeout in nanoseconds for idle threads waiting for work. * 如果線程池中運行的線程比corePoolSize大,多餘的線程會在沒有任務的時候的等待keep-alive times,如果在這個時間段內還是沒有任務執行,會回收這個線程直到corePoolSize數量(如何回收的? go processWorkerExit), * 註:(這個欄位只有當maximumPoolSize大於corePoolSize時有效) */ private volatile long keepAliveTime; // 任務隊列 private final BlockingQueue workQueue; /** *新的線程 是通過 ThreadFactory創建的,預設的創建方式是通過Executors.defaultThreadFactory來創建,我們可以使用guava提供的ThreadFactoryBuilder()來創建自定義的線程工廠類 */ private volatile ThreadFactory threadFactory; // 飽和策略 private volatile RejectedExecutionHandler handler; /** * 預設的拒絕策略是在execute()的時候直接拋出異常,使用的時候要記得處理這個異常 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * 保護worker的鎖,為什麼不用 concurrent包中的集合,而是使用 HashSet + * Lock呢?在shutdown的時候會發中斷信號給idle work, * 如果shutdown被併發執行,那麼idle的線程就不停地接受到中斷信號(intrrupt storms), * 但是如果使用lock + hashSet的時候,shutdown會先獲取鎖,然後再發中斷信號, * 這樣即使shutdown方法被併發調用,後來的調用由於無法獲取鎖無法發送中斷信號,從而避免了中斷風暴. * 註:(主要為了確保穩定,而不是性能,所以選擇使用HashSet+Lock,而不是 concurrentSet, * workerSet併發訪問的場景並不多,除了shutdown,shutdownNow,以及一些統計方法比如 * largestPoolSize外可能併發訪問workerSet) */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 維護工作者線程, 只有再獲取到mainLock的時候才允許訪問(為了線程安全) * @ThreadSafe(mainLock) * 註:([臨界區問題](os層面),[data race, race condition](現象分析層面),[不變性條件,先驗條件,後驗條件,線程安全](類設計層面),[可見性,順序性,原子性](純理論層面),複合操作描述的都是同一個事情,即併發程式執行的正確性) */ private final HashSet workers = new HashSet(); /** * awaitTermination的實現,將等待線程放到 condition中維護 */ private final Condition termination = mainLock.newCondition(); } ```

ThreadPoolExecutor狀態描述

該類主要包含下麵幾個狀態:

狀態描述
RUNNINGAccept new tasks and process queued tasks
SHUTDOWN Don't accept new tasks, but process queued tasks
STOPDon't accept new tasks, don't process queued tasks
TIDYINGAll tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED terminated() has completed
ThreadPoolExecutor狀態機:

狀態間的操作:

操作描述
shutdown()僅會給idle Worker發送中斷信號,會緩慢的結束線程池服務:將queue中的任務都執行完
shutdownNow()會給所有Worker發送中斷信號,會快速的結束線程池服務(不安全): 嘗試中斷正在執行任務的線程,同時返回queue中的任務列表
awaitTermination()是一個基於狀態的方法,將在狀態達到TERMINATED時返回,可以用在需要同步判斷線程池關閉的場景
其餘方法從圖可以看出,任何可以使的queue和pool為空的操作比如:addWorkerFailed,processWorkerExit,shutdown,shutdownNow等都有可能使得狀態轉為TERMINATE,所以這些方法都會調用tryTerminate(),以確保服務在正確的狀態

Worker欄位描述

點擊查看代碼
**
     * 實際的工作者線程,主要維護線程的中斷狀態
     * 這個類為了簡化在運行任務的時候對鎖的獲取和釋放,設計成了 extends AQS
     * 當shutdown的時候,會通過tryLock判斷線程是否正在執行任務,如果為false,表示線程不在執行任務,而是在等待新的任務,通過發送中斷信號,中斷這些線程的等待,這些線程被中斷後會判斷是由於什麼原因喚醒的,如果這個時候線程池狀態為SHUTDOWN,那麼這些線程就會被回收.
     * 註:(線程被喚醒的原因可能是被中斷了,也有可能是有任務了,也有可能是時間到了,喚醒後需要二次判斷go getTask())
     * 註:(線程由於沒有任務掛起(poll()), 掛起期間可能有新的任務過來了(offer())被喚醒,也有可能被中斷信號通知關閉而被喚醒.)
     */
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /**
         * Worker繼承了AQS,也就是說Worker還有一個state屬性欄位,這個欄位是有必要分析下的:
         * -1: 剛初始化 
         * 0: 剛調用runWorker或者沒任務了 
         * 1: 正在執行任務,正是通過這個state欄位,來判斷線程是否正在執行任務(tryLock)
         */
        final Thread thread;
        // 在Worker初始化時,firstTask可能有值
        Runnable firstTask;
        // 每個工作者線程完成的任務數,任務性質可以不同,即線程是可以復用的
        volatile long completedTasks;
        Worker(Runnable firstTask) {
        // 只有在worker線程已開始的時候中斷才有意義,所以在初始化worker的時候state=-1,這個時候不會被中斷go isLocked()
            setState(-1);
            this.firstTask = firstTask;
            // 初始化Workder的時候 通過 threadFactory創建線程,最終通過系統調用,由OS創建內核線程
            this.thread = getThreadFactory().newThread(this);
        }
        // runWorker實際實現主執行迴圈,接下來就是重點了,任務線程初始化時,拿到了firstTask(有的話),以及一個新的線程,接下來就開始真正地執行任務了
        public void run() {
            runWorker(this);
        }
        // 設計worker類的主要目的,用來中斷線程
        void interruptIfStarted() {
            Thread t;
            // 只有在worker線程已開始的時候中斷才有意義, 所以在初始化worker的時候state=-1,這個時候不會被中斷
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker狀態描述

Worker主要有3個狀態

狀態描述
INIT(-1)初始Worker狀態
WAINTING TASK(0)等待任務到達
RUNING正在執行任務

Worker狀態機如圖

任務的提交與調度

在介紹完具體的ThreadPoolExecutor與Worker的描述以及狀態機後,我們先來大致看下ThreadPoolExecutor的工作流程,有助於理解後續的操作步驟.

從圖中我們可以看出,一個正常執行完成的任務其主要經過submit() -> addWorker()->worker.thread.start()->worker.run()->runWorker()→task.run()等步驟,下麵我們具體介紹下這些步驟.

任務調度方法主要在execute,具體源碼註釋如下:

點擊查看代碼
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 線程池小於core
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 新增任務失敗,可能在addWorker的時候線程數達到了corePoolSize的水平,此時放到workQueue
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 判斷如果線程池正在shutdown,拒絕任務
            if (!isRunning(recheck) && remove(command))
                reject(command);
            // 確保任務隊列中的任務可以被線程執行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
            // 工作隊列滿了,再嘗試增加worker,線程個數判斷使用 maxvalue
        } else if (!addWorker(command, false))
            reject(command);
    }

工作線程的創建與執行

工作線程的創建主要根據線程池狀態,core和maximum參數判斷是否可以新增工作線程,如果新增成功,則開始執行任務.

工作線程的創建

點擊查看代碼
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                // 當shutdown且 隊列是空的時候就沒必要加worker了
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 達到限制數量了也返回false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 非shutdown,或者 是shutdown但是firstTask==null的時候,可以新增線程
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        // 新增worker的時候更新largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // start() -> runWorker()->task.run()
                    // 新增成功後 調用start(),如果start()失敗了,比如ntive stack申請失敗,也返回false
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

工作線程的執行

工作者線程的執行原理上比較簡單,既不斷從任務隊列中取出任務,執行任務,然後返回線程池並等待下一個任務。

// 典型的線程池工作者線程結構
public void run() {
    Throwable thrown = null;
    try {
        while(!isInterrupted()) 
            runTask(getTaskFromWorkQueue());
        }catch(Throwable e) {
            throw = e;
        }finaly {
        threadExited(this,thrown);
    	}
    }
}

下麵是ThreadPoolExecutor實際的工作者線程的任務執行,其中會涉及到線程的回收,任務的取消等實現.

點擊查看代碼
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // state: -1 => 0 , unlock -> release -> tryRelease -> state=0
        // 這個時候任務線程開始工作,可以被中斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // getTask從隊列中拿任務
            while (task != null || (task = getTask()) != null) {
       // 工作的時候將state置為1,表示正在工作,這個操作一定會成功(正常來說lock是一個基於狀態的方法,可能會阻塞調用線程),因為不會有其他地方調用w.lock
                // 註:(state: 0 => 1 lock -> acquire -> tryAcquire -> state=1)
                w.lock();
                // 線程當且僅當池子stopping(shutdown,shutdownNow的時候)的時候才會interrupted,且一定要interrupted
                // 註:(worker線程是由線程池服務來維護的,只有線程池服務有權對worker線程進行中斷操作)
                if ((runStateAtLeast(ctl.get(), STOP) ||
                // 註:(Thread.interrupted會清除interrupted標記)
                // 這裡表明worker線程只能在STOPING(STOP,TIDING,TERMINATED)時中斷信號有效,其他形式的中斷信號(例如在任務中中斷)會被清除
                        (Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)))
                        &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // hooc 函數
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                  // 保存異常 thrown 到1326處理(給客戶端提供的鉤子函數,afterExecute,使客戶端可以感知到任務失敗併進行特定的處理),同時拋出異常到
                        // 1330 處理(線程池自身對任務異常的處理)
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        // 將任務執行過程中的異常傳入到hooc函數
                        afterExecute(task, thrown);
                    }
                } finally {
       // beforeExecutehooc函數出錯或者任務出錯了的話,task=null,從而跳到1336,completedAbruptly=true,從而回收線程,即使線程並沒有完成任何工作
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 處理task=null的場景或者任務拋處異常時的場景,釋放線程,什麼時候task會為null ,go getTask
            processWorkerExit(w, completedAbruptly);
        }
    }

服務的關閉,任務的取消與線程的回收

服務的關閉

通過調用shutdown或者shutdownNow給工作線程發送中斷信號嘗試取消任務,並回收線程,繼而關閉服務

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 狀態至為SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 給每個idle工作線程(已啟動且沒任務的)線程發送中斷信號
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 狀態置為 STOP
            advanceRunState(STOP);
            // 給每個工作線程發送中斷信號
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

線程的回收

線程回收流程圖:

觸發線程的回收主要有下麵幾種情況

  • 由於setMaximumPoolSize,導致currentSize > maximumPoolSize時,getTask()返回null
  • 線程池狀態為stop時,即調用shutdownNow()時,getTask()返回null
  • 線程池狀態為shutdown,即調用shutdown(),線程池給idle線程發送中斷信號,如果此時任務隊列為空時,getTask()返回null
  • 線程等待任務超時,getTask()返回null
  • 任務執行失敗,拋出運行時異常,導致task=null

當getTask()返回null或者task=null時,runWorker()跳到processWorkExit()處理線程回收,此時會新增線程來替換由於任務異常而被終止的線程

點擊查看代碼
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // stopped 或者 shutdown 且 workQueue.isEmpty 返回null 2,3
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // allowCoreThreadTimeOut等價於 wc> corePooSize
            // allowCoreThreadTimeOut, wc>corePoolSize, 一起表示 當任務線程獲取任務超時時,被要求中斷(subject to termination)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 1 wc > maxPoolSize 或者 4 獲取任務超時且 要求獲取任務超時的進程被中斷(timed && timedOut)
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                // 如果沒有任務,則阻塞在這裡, workQueue.offer後繼續運行
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                // r==null,poll返回null,表示timedOut,下次 go 1210
                timedOut = true;
            } catch (InterruptedException retry) {
                // 忽略中斷信號
                timedOut = false;
            }
        }
    }

線程池通過workers.remove()操作來釋放worker的引用,從而由垃圾回收器回收線程,如果線程是由於任務執行異常而導致的終止,則會新增工作線程來替換它.

點擊查看代碼
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // getTask時會decrementWorkerCount
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 回收線程前先將線程執行的任務數加一下
            completedTaskCount += w.completedTasks;
            // 通過釋放worker引用來釋放線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 如果不是由於任務忽然中斷且線程數符合最小值的要求,那麼無需addWorker替換
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 如果任務線程是由於任務執行異常退出的 或者 線程池中的數量小於min,addWorker
            addWorker(null, false);
        }
    }

由上文我們瞭解到,無論是任務的取消,還是線程池服務的關閉,其中都是通過線程的中斷來實現的,理解了線程中斷我們就能夠理解任務的取消以及服務關閉的具體含義。

線程的中斷

中斷機制是一種Java線程間的通信機制,每個Java線程都有一個boolean類型的中斷狀態。當調用Thread.interrupt(),並不意味著立即停止目標線程正在執行的任務,只是傳遞一個中斷請求,將中斷狀態置為true。至於什麼時候讀這個狀態,以及基於這個狀態做什麼操作,則完全由任務自身去控制。(早期的jdk庫提供了Thread.stop(),Thread.suspend(),Thread.resume()來允許用戶暴力終止,暫停,恢復一個線程,在jdk1.2後這些方法就被置為deprecated了,因為這樣操作是不安全的,stop()會強制釋放掉線程持有的鎖,拋出ThreadDeathException,導致數據處於不一致的狀態,從而造成未知的後果)例如:

public class Factorizer {
    private BigInteger lastNumber;
    private BigInteger[] lastFactors;
    
    public synchronized BigInteger[] cal(BigInteger number) {
        if(Objects.equal(number,lastNumber)) {
            return lastFactors;
        }else {
            //這兩步是複合操作,需要原子性,我們不會在這兩步之間判斷Thread.currentThread().isInterrupted()
            lastFactors = factor(i);
            lastNmuber=i;
         	return lastFactors;
        }
    }
}

jdk中有許多長時任務都是通過中斷機制取消任務的。它們對中斷的響應通常是:清除中斷狀態(Thread.interrupted()),然後拋出一個異常(InterruptedException),表示長時任務操作由於中斷而提前結束。(wait,join,sleep,FutureTask.get(),CoundownLatch.await,lockInterrrupted(),BlockQueue.poll()等)

在編寫任務的時候,基於這個狀態做什麼請求或者不做請求,例如重試或者忽略,都是可以的,只要滿足自身任務的需要即可。但設計糟糕的任務可能會屏蔽中斷請求,從而導致其他方法調用該任務的時候無法對中斷進行響應,例如:

不安全的中斷示例
public static void main(String[] args) {
        Thread calPrimeTask = new Thread(InterruptedTest::calPrime);
        calPrimeTask.start();
        ThreadUtil.sleep(1000);
        // 嘗試終止calPrimeTask
        calPrimeTask.interrupt();
    }


    public static void calPrime() {
        while(!Thread.currentThread().isInterrupted()) {
            ThreadUtil.sleep(50);
            log();
            System.out.println("一個耗時50ms的任務完成!");
        }
    }

    public static void log() {
        /**
         * 假設有一段代碼調用了jdk中的某個可能拋出InterruptedException的介面,這段代碼捕獲到這個異常後本意是不會處理這個異常,但是如果它沒有再
         * Thread.currentThread().interrupt(),就會影響其他使用到這個方法的函數,例如calPrime();
         */
        ArrayBlockingQueue<Integer> que = new ArrayBlockingQueue<>(12);
        try {
            System.out.println("do other thing");
            que.poll(30, TimeUnit.MILLISECONDS);
        }catch (InterruptedException e) {
            e.printStackTrace();
            // poll拋出 InterruptedException後,會清空 interrupted標記,這裡返回false
            System.out.println(Thread.currentThread().isInterrupted());
          // 如果這裡不重新設置interrupted標記的話,這回使得calPrimary任務無法取消,我們不知道調用棧的其他地方是否會用到中斷信號,所以必須把中斷信號設置回去
           Thread.currentThread().interrupt();
        }
    }


// 不支持取消但仍可以調用可中斷阻塞方法,忽略中斷信號並重試
public Task getNextTask(BlockingQueue<Task> queue) {
    boolean interrupted = false;
    try {
        while(true) {
            try {
                return queue.take();
            }catch(InterruptedException e) {
                interrupted = true;
                // 忽略並重試
                // 如果我們在這裡調用Thread.currentThread().interrupt()的話會引起死迴圈
            }
        }finally {
            if(interrupted) {
                // 避免屏蔽中斷信號,其他方法可能需要
                Thread.currentThread().interrupt();
            }
        }
    }
}

線程池的使用

在實際生產生活中,由於任務性質的多種多樣,我們往往會自定義符合各自應用場景的線程池來執行任務,不同的線程池參數設置意味著不同的任務執行策略(避免雞蛋放在一個籃子里)。

// 不自定義線程池的危害:可能造成無法預知的死鎖情況,次要的任務的執行影響重要的任務
    public void deadLock() throws Exception{
     
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Callable<String> task1 = () -> {
                ThreadUtil.sleep(2000);
                countDownLatch.countDown();
                return "task1 finished";
        };
        // task2 依賴 task1
        Callable<String> task2 = () -> {
            countDownLatch.await();
            ThreadUtil.sleep(1000);
            return "task2 finished";
        };
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        // 假如 task2先於task1調度,就會發生死鎖,因為只有一個線程,task1在任務隊列里依賴task2的完成
        Future<String> result2 = executorService.submit(task2);
        Future<String> result1 = executorService.submit(task1);
        System.out.println(result2.get() + result1.get());
    };

那麼,線程池參數的選擇就顯得尤為重要。以下是一些ThreadPoolExecutor參數的介紹以及使用建議。

1 核心線程數與最大線程數

從上文的任務執行流程我們大致可以瞭解到,線程池主要通過這兩個參數控制工作線程的數量。在設置這兩個參數的時候需要註意以下兩個問題。

1.1 工作線程的大小設置

大小設置主要影響系統的吞吐量。如果設置過小造成資源利用率低,人為地降低了系統的吞吐量,如果設置過大會造成線程競爭加劇,使得消耗更多的的計算資源線上程上下文切換上而不是執行任務上,最終也會導致系統的吞吐量降低。

設置線程池大小主要以下幾種策略:

策略一
coreSize = 2×Ncpu,maxSize = 25×Ncpu

實際使用中我們往往會定義許多線程池,如果每個線程池的大小會導致核心線程越來越多,會使得競爭加劇,甚至達到操作系統限制的線程池數量

策略二
W/C: wait time/ compute time
Ucpu: 目標CPU利用率
Ncpu: Runtime.getRuntime().availableProcessors()
Nthreads = Ncpu * Ucpu * (1 + W/C)

I/O任務大部分時間都在等待I/O完成,這個值會比較大,上線前不好估計此值,而且線程池中的任務類型不一定都是一致的

策略三
QPS:每秒任務數 999線:單個任務的花銷
1s內一個線程能夠執行的任務數: 1/999線
1s內n個線程能執行的任務數:n * 1/999線
即 QPS=n/999線 ==> n = QPS*999線

核心線程:corePoolSize = QPS * 999線(單位:s)

timeout: 能容忍介面的最大響應時間
隊列大小:queueCapacity = corePoolSize/999線 * timeout = QPS * timeout

最大線程:maxPoolSize = (QPS峰值- queueCapacity) * 999線

此策略考慮了實際生產環境的任務使用情況,也是假定線程池中的任務是同類型的。

如果線程池中的任務不是服務間調用而是單獨的函數或者sql調用,那麼QPS和999線就不好估計了。

策略四
使用動態線程池,可以動態調整線程池參數,以應對不同的使用場景變化,且可以通過cat監控線程池的使用情況。
實際生產業務使用中建議使用動態線程池,動態調整任務執行策略,同時為避免線程資源浪費,搭配下文提到的allowCoreThreadTimeOut一起使用。

 public static ThreadPoolExecutor getExecutor(String name,boolean allowCoreThreadTimeOut) {
        ThreadPoolExecutor result = null;
        try {
            result =  DynamicThreadPoolManager.getInstance().getThreadPoolExecutor(name);
        }catch (PoseidonException e) {
            log.error("ExecutorCase.getExecutor Error:",e);
        }
        if(Objects.isNull(result)) {
            return ExecutorUtil.getExecutor(name,Runtime.getRuntime().availableProcessors());
        }
        // 任務完成後不需要留有核心線程可關閉
        result.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
        return result;
    }

1.2 工作線程的回收

線程本質上是兩個方法調用棧,是一個共用資源。當一個線程池處理QPS較低的任務時(eg:boss後臺的介面,或者執行周期長的定時任務),我們往往會想當無任務執行的時候線程池可以自動回收線程資源,於是將coreSize設置成0。

假如我們將coreSize=0,但卻使用的是有界隊列,比如new ArrayBlockingQueue<>()。按照上文的執行流程,那麼只有當任務塞滿任務隊列的時候,線程池才會正式開始執行任務。

為瞭解決這個問題,jdk1.6版本後的ThreadPoolExecutor提供了allowCoreThreadTimeOut欄位。將這個欄位置為ture後,我們不用設置coreSize=0,就可以讓線程在無任務的時候等待keepAliveTime時間,將coreThread回收。具體實現可以查看:getTask,processWorkerExit方法。

2 任務隊列

ThreadPoolExecutor使用的是BlockingQueue作為任務隊列,即任何阻塞隊列都可以用於任務的存儲和轉發。下麵介紹3種常見任務隊列的選擇策略

策略一

無界隊列(Unbounded queues)

例如LinkedBlockingQueue。使用無界隊列主要適用於任務執行時間很短且確定的任務,例如找出某個自然數的因數。這種任務一定能夠快速執行完成。但是實際業務場景中的任務執行時間通常是不確定的,需要遠程調用介面,有許多I/O操作,這樣就找出了任務消費很慢,如果此時有任務提交過來會找出OOM,從而影響整個服務的穩定。所以不建議使用。

策略二

有界隊列(Bounded queues)

例如ArrayBlockingQueue。用於限制資源的使用量,避免出現OOM。

調整任務隊列長度時往往也要調整最大線程數(maxmiumSize)。

case問題
很大的queue.size,很小的maxmiumSize一方面降低CPU了使用率和線程切換頻率,避免過度競爭,從而導致人為的降低了吞吐量(可以是優點也可以是缺點)
很小的queue.size,很大的maxmiumSize會導致CPU使用率增高,這也會導致吞吐量降低

實際使用的時候大部分任務都是i/o密集型的,所以其實可以併發執行比我們想的更多的任務,適用於不緊急但希望儘可能快的任務,例如定時job任務或者導出任務。這種任務的執行我們希望在不影響其他服務的情況下儘可能快的執行。

策略三

直接處理任務(Direct handoffs)

例如:synchoronousQueue。當客戶端提交任務時,在有合適的線程執行此任務才返回,否則阻塞客戶端。

使用這個隊列少了入隊和出隊操作,效率更好,適用於需要儘快響應的任務,例如h5端的介面。

這種方式通常需要 unbounded maxmiumPoolSize, 即無限制的線程數,但是如果當客戶端不停地提交任務且消費不過來地時候,會有線程數瘋狂飆升,造成系統不穩定的風險。所以實際使用中還是會限制 maxmiumSize的值,可以通過使用下文中提到的CallerRunRejectPolicy來緩慢降低客戶端提交任務的速度。從而將非同步降級為同步執行。

3 任務拒絕策略

線上程池關閉(shutdown()),或者任務隊列滿了,工作線程也滿了的時候會執行RejectedExecutionHandler.rejectedExecution(),來拒絕任務,jdk為我們提供了以下四種拒絕策略,我們也可以自己定義合適的拒絕策略.

拒絕策略描述
ThreadPoolExecutor.AbortPolify預設拒絕策略,拋出RejectedExecutionException異常來通知客戶端有任務被拒絕,客戶端經常會忽略這個異常的處理導致發生線上問題
ThreadPoolExecutor.DiscardPolify丟棄最新提交的任務。一般沒有哪個任務是可以丟棄的,不建議使用。
ThreadPoolExecutor.DiscardOldestPolify丟棄最先提交的任務。這裡註意如果使用的是優先順序隊列的話,會拋棄最高優先順序的任務,隨意得謹慎使用
ThreadPoolExecutor.CallerRunPolify由客戶端線程執行任務,即 客戶端代碼 -> submit() -> task.run() ->客戶端代碼,可以降低任務的提交速度,使得由非同步執行降級為同步執行

4 工作線程工廠類

線程池使用線程工廠類來生成工作線程,我們可以自定義或者使用guava提供的ThreadFactoryBuilder()來創建線程工廠類

public class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory(String threadPoolName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = threadPoolName + "-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}
// guava
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat(name + "-pool-%d").build();

JDK平臺提供的預設線程池

Executors作為Executor介面的伴生類,提

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • php-fpm下讀取到is_cli為true,不知道你們是否遇到過,我是遇到了。。。。 有人會說,即使為true又怎麼了,你是沒遇到有些根據is_cli來走不同邏輯判斷的,如果讀取的是錯的就會引起很大的問題。。。。 ...
  • 工作中會遇到需要監控程式運行過程數據的情況,比如定時執行監控的cmd並記錄執行結果,本文提供一種實現方式,便於後續排查問題。 ...
  • 本文講述了我排查「Hyperf 註解失效」問題的過程,整個排查過程看似一氣呵成,但實際上要曲折得多,甚至一度覺得這是個玄學問題。 ...
  • SpringBoot之靜態資源訪問&REST風格請求 1.SpringBoot靜態資源訪問 1.1基本介紹 只要靜態資源是放在類路徑下的:/static、/public、/resources、/META-INF/resources,則可以直接被訪問。根據是: SpringBoot在啟動的時候會去解析 ...
  • 遞歸 引入 什麼是遞歸?先看大家都熟悉的一個民間故事:從前有座山,山上有座廟,廟裡有一個老和尚在給小和尚講故事,故事里說,從前有座山,山上有座廟,廟裡有一個老和尚在給小和尚講故事,故事里說……。象這樣,一個對象部分地由它自己組成,或者是按它自己定義,我們稱之為遞歸。 一個函數、過程、概念或數學結構, ...
  • Maven Maven是apache軟體基金會旗下的一個開源項目,是一款用於管理和構建Java項目的工具。 Maven的作用? 先來簡單介紹一下Maven的作用 (1)依賴管理 方便快捷的管理項目依賴的資源(就是咱們常說的jar包),避免一些版本衝突。 方便快捷的把jar包通過Maven的指定格式引 ...
  • Python是一種高級編程語言,它用於通用編程,由Guido van Rossum 在1991年首次發佈。Python 的設計著重於代碼的可讀性。 Python有一個非常大的標準庫,並且有一個動態類型系統,它還具有自動記憶體管理功能,支持多種編程範例。這些包括: ● 面向對象 ● 命令式 ● 函數式 ...
  • 剛開始使用eclipse軟體學習Java時,發現它的工具欄的圖標實在是太小了,怎麼解決呢? 你開始打開瀏覽器,在搜索欄中敲入“eclipse的工具欄的圖標太小怎麼辦?”,瀏覽了很多的方法,然後發現一個帖子上寫的方法很簡單(如下圖), 按照它的方法操作,發現圖標的大小問題解決了,但是卻出現了一個更大的 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...