Disruptor測試結果運算1億次,耗時5503ms,吞吐量18171000/s,於是我扒開了Disruptor高性能的外衣

来源:https://www.cnblogs.com/jiagooushi/archive/2022/09/20/16711199.html
-Advertisement-
Play Games

能對比測試 為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。 測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現: Pr ...


能對比測試

為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。

測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現:

  1. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  2. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 Disruptor 傳遞給 Consumer 進行處理;

3.1 代碼實現

3.1.1 計算代碼

進行CAS累加運算

public class CommonUtils {
    private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {
        count.incrementAndGet();
    }

    public static long get() {
        return count.get();
    }
}
3.1.2 抽象類

進行一億次 CAS運算計算耗時

/**
 * 抽象類
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    //線程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    //一億次測試
    public static long tasksize = 100000000;


    /**
     * 開始調用測試
     */
    public void invok() {
        //計算當前事件
        long currentTime = System.currentTimeMillis();
        //獲取到監聽器
        Runnable monitor = monitor();
        if (null != monitor) {
            executor.execute(monitor);
        }
        //啟動
        start();

        //執行任務發佈
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {
            runnable.run();
        }

        //停止任務
        stop();
        //等待任務發佈完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //獲取處理結果
        T result = getResult();
        //計算耗時
        long duration = System.currentTimeMillis() - currentTime;
        //計算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 獲取監聽器
     *
     * @return
     */
    public Runnable monitor() {
        return null;
    }

    /**
     * 啟動任務
     */
    public void start() {

    }

    /**
     * 完成任務
     */
    public void complete() {
        countDownLatch.countDown();
    }

    /**
     * 停止任務
     */
    public void stop() {

    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 獲取運行結果
     *
     * @return
     */
    public abstract T getResult();
}

3.1.3 Disruptor性能測試代碼
public class DisruptorTest extends AbstractTask<Long> {
    //定義隨機數生成器
    private static final Random r = new Random();
    //定義Disruptor對象
    private Disruptor disruptor = null;
    //定義Disruptor事件發佈對象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 啟動
     */
    @Override
    public void start() {
        //定義事件工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必須是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        //構建disruptor對象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        //定義事件處理類
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        //配置事件處理類
        disruptor.handleEventsWith(eventHandler);
        //啟動disruptor
        disruptor.start();
        //創建事件發佈對象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 停止任務
     */
    @Override
    public void stop() {
        disruptor.shutdown();
        System.out.println("運算結果:" + CommonUtils.get());
        //完成任務
        complete();
    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    @Override
    public Runnable getTask() {
        return () -> {
            publishEvent();
        };
    }

    /**
     * 獲取運行結果
     *
     * @return
     */
    @Override
    public Long getResult() {
        return CommonUtils.get();
    }


    /**
     * 發佈對象
     */
    private void publishEvent() {
        //獲取要通過事件傳遞的業務數據
        Long data = r.nextLong();
        // 發佈事件
        translator.onData(data);
    }


    public static void main(String[] args) {
        DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();
    }

}

輸出結果

10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能測試代碼
public class ArrayBlockingQueueTest extends AbstractTask {
    private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {
        return () -> {
            try {
                for (int i = 0; i < tasksize; i++) {
                    //獲取一個元素
                    queue.take();
                    //執行計算
                    CommonUtils.calculation();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            complete();
        };
    }

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();
    }

    @Override
    public Runnable getTask() {
        return () -> {
            publish();
        };
    }

    @Override
    public Object getResult() {
        return CommonUtils.get();
    }

    public void publish() {
        Long data = r.nextLong();
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結果

10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 測試對比

測試類 運算次數 耗時(ms) 吞吐量/s
ArrayBlockingQueue 1億次 16148 6192000
Disruptor 1億次 5503 18171000

3.3 Disruptor官方性能測試

Disruptor論文中講述了一個實驗:

  • 這個測試程式調用了一個函數,該函數會對一個64位的計數器迴圈自增5億次。
  • 機器環境:2.4G 6核
  • 運算: 64位的計數器累加5億次
Method Time (ms)
單線程 300
單線程使用 CAS 5,700
單線程使用鎖 10,000
單線程使用volatile 4,700
多線程使用 CAS 30,000
多線程使用鎖 224,000

4. 高性能原理

  • 引入環形的數組結構:數組元素不會被回收,避免頻繁的GC,
  • 無鎖的設計:採用CAS無鎖方式,保證線程的安全性
  • 屬性填充:通過添加額外的無用信息,避免偽共用問題
  • 元素位置的定位:採用跟一致性哈希一樣的方式,一個索引,進行自增

4.1 偽共用概念

4.1.1 電腦緩存構成

​ 下圖是計算的基本結構。L1、L2、L3分別表示一級緩存、二級緩存、三級緩存,越靠近CPU的緩存,速度越快,容量也越小,所以L1緩存很小但很快,並且緊靠著在使用它的CPU內核;L2大一些,也慢一些,並且仍然只能被一個單獨的CPU核使用;L3更大、更慢,並且被單個插槽上的所有CPU核共用;最後是主存,由全部插槽上的所有CPU核共用。

file

​ 當CPU要讀取一個數據時,首先從一級緩存中查找,如果沒有找到再從二級緩存中查找,如果還是沒有就從三級緩存或記憶體中查找。一般來說,每級緩存的命中率大概都在80%左右,也就是說全部數據量的80%都可以在一級緩存中找到,只剩下20%的總數據量才需要從二級緩存、三級緩存或記憶體中讀取,由此可見一級緩存是整個CPU緩存架構中最為重要的部分。

file

下表是一些緩存未命中的消耗數據:

從CPU到 大約需要的CPU周期 大約需要的時間
主存 約60-80ns
QPI匯流排 約20ns
L3 cache 約40-45cycles 約15ns
L2 cache 約10cycles 約3ns
L1 cache 約3-4cycles 約1ns
寄存器 1cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

4.1.2 什麼是緩存行

​ 為瞭解決電腦系統中主記憶體與 CPU 之間運行速度差問題,會在 CPU 與主記憶體之間 添加一級或者多級高速緩衝存儲器( Cache)。這個 Cache 一般是被集成到 CPU 內部的, 所以也叫 CPU Cache,如圖所示是兩級 Cache 結構。

file

​ Cache內部是按行存儲的,其中每一行稱為一個cache line,由很多個 Cache line 組成的,Cache line 是 cache 和 RAM 交換數據的最小單位,cache行的大小一般為2的冪次數位元組,通常為 64 Byte。Cache line是Cache與主記憶體進行數據交換的單位。

file

​ 當 CPU 把記憶體的數據載入 cache 時,會把臨近的共 64 Byte 的數據一同放入同一個Cache line,因為空間局部性:臨近的數據在將來被訪問的可能性大。

linux 查看緩存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什麼是共用

​ CPU緩存是以緩存行(cache line)為單位存儲的。緩存行通常是 64 位元組,並且它有效地引用主記憶體中的一塊地址。一個 Java 的 long 類型是 8 位元組,因此在一個緩存行中可以存 8 個 long 類型的變數。所以,如果你訪問一個 long 數組,當數組中的一個值被載入到緩存中,它會額外載入另外 7 個,以致你能非常快地遍歷這個數組。事實上,你可以非常快速的遍歷在連續的記憶體塊中分配的任意數據結構。而如果你在數據結構中的項在記憶體中不是彼此相鄰的(如鏈表),你將得不到免費緩存載入所帶來的優勢,並且在這些數據結構中的每一個項都可能會出現緩存未命中。下圖是一個CPU緩存行的示意圖:

file

​ 錶面上 X 和 Y 都是被獨立線程操作的,而且兩操作之間也沒有任何關係。只不過它們共用了一個緩存行,但所有競爭衝突都是來源於共用。

4.1.4 什麼是偽共用

​ 當CPU訪問某一個變數時候,首先會去看CPU Cache內是否有該變數,如果有則直接從中獲取,否者就去主記憶體裡面獲取該變數,然後把該變數所在記憶體區域的一個Cache行大小的記憶體拷貝到Cache(cache行是Cache與主記憶體進行數據交換的單位)。

​ 由於存放到Cache行的的是記憶體塊而不是單個變數,所以可能會把多個變數存放到了一個cache行。當多個線程同時修改一個緩存行裡面的多個變數時候,由於同時只能有一個線程操作緩存行,所以相比每個變數放到一個緩存行性能會有所下降,這就是偽共用。

file

​ 如上圖變數x,y同時被放到了CPU的一級和二級緩存,當線程1使用CPU1對變數x進行更新時候,首先會修改cpu1的一級緩存變數x所在緩存行,這時候緩存一致性協議會導致cpu2中變數x對應的緩存行失效,那麼線程2寫入變數x的時候就只能去二級緩存去查找,這就破壞了一級緩存,而一級緩存比二級緩存更快。更壞的情況下如果cpu只有一級緩存,那麼會導致頻繁的直接訪問主記憶體。

​ 我們的緩存都是以緩存行作為一個單位來處理的,所以失效x的緩存的同時,也會把y失效,反之亦然。

4.1.5 為何會出現偽共用

​ 偽共用的產生是因為多個變數被放入了一個緩存行,並且多個線程同時去寫入緩存行中不同變數。那麼為何多個變數會被放入一個緩存行那。其實是因為Cache與記憶體交換數據的單位就是Cache line,當CPU要訪問的變數沒有在Cache命中時候,根據程式運行的局部性原理會把該變數在記憶體中大小為Cache行的記憶體放如緩存行。

long a;
long b;
long c;
long d;

​ 如上代碼,聲明瞭四個long變數,假設cache line的大小為32個位元組,那麼當cpu訪問變數a時候發現該變數沒有在cache命中,那麼就會去主記憶體把變數a以及記憶體地址附近的b,c,d放入緩存行。也就是地址連續的多個變數才有可能會被放到一個緩存行中,當創建數組時候,數組裡面的多個元素就會被放入到同一個緩存行。那麼單線程下多個變數放入緩存行對性能有影響?其實正常情況下單線程訪問時候由於數組元素被放入到了一個或者多個cache行對代碼執行是有利的,因為數據都在緩存中,代碼執行會更快。

4.1.6 如何解偽共用

​ 解決偽共用最直接的方法就是填充(padding),例如下麵的VolatileLong,一個long占8個位元組,Java的對象頭占用8個位元組(32位系統)或者12位元組(64位系統,預設開啟對象頭壓縮,不開啟占16位元組)。一個緩存行64位元組,那麼我們可以填充6個long(6 * 8 = 48 個位元組)。

4.1.6.1 不使用欄位填充
public class VolatileData {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.6.1.2 填充欄位

因為JDK1.7以後就自動優化代碼會刪除無用的代碼,在JDK1.7以後的版本這些不生效了。

/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5; //jvm 優化 刪除無用代碼
    //需要操作的數據
    volatile long value;
}

記憶體佈局

file

4.1.6.3 繼承的方式
/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5;
}

繼承緩存填充類

/**
 * 繼承DataPadding
 */
public class VolatileData extends DataPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.4 Disruptor填充方式
class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

繼承填充類

public class VolatileData extends RhsPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.5 @Contended註解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
    String value() default "";
}

註解填充類

@Contended
public class VolatileData  {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;
    
    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

註意事項

​ 在Java8中提供了@sun.misc.Contended來避免偽共用時,在運行時需要設置JVM啟動參數-XX:-RestrictContended否則可能不生效。

4.1.7 性能對比
4.1.7.1 測試代碼

使用和不使用緩存行填充的對比

/**
 * 緩存行測試
 */
public class CacheLineTest {
    /**
     * 通過緩存行填充的變數
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 迴圈次數
     */
    private final long size = 100000000;

    /**
     * 進行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        //計算耗時
        long currentTime = System.currentTimeMillis();
        long value = 0;
        //迴圈累加
        for (int i = 0; i < size; i++) {
            //使用緩存行填充的方式
            value = volatileData.accumulationAdd();


        }
        //列印
        System.out.println(value);
        //列印耗時
        System.out.println("耗時:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        //創建對象
        CacheLineTest cacheRowTest = new CacheLineTest();
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //啟動三個線程個調用他們各自的方法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();
    }
}
4.1.7.2 測試數據

同樣的結構他們之間差了 將近 50倍的速度差距

對象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor解決偽共用

​ 在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩衝區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩衝區中,Sequence標示著寫入進度,例如每次生產者要寫入數據進緩衝區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示著它們的事件序號,來看看Sequence類的源碼:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	


    public Sequence() {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 從第1到11行可以看到,真正使用到的變數value,它的前後空間都由8個long型的變數填補了,對於一個大小為64位元組的緩存行,它剛好被填補滿(一個long型變數value,8個位元組加上前/後個7long型變數填補,7*8=56,56+8=64位元組)。這樣做每次把變數value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個位元組的緩存行來說,如果緩存行大小大於64個位元組,那麼還是會出現偽共用問題),保證每次處理數據時都不會與其他變數發生衝突。

4.2 無鎖的設計

4.2.1 鎖機制存在的問題
  • 在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題,而且在上下文切換的時候,cpu之前緩存的指令和數據都將失效,對性能有很大的損失,用戶態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

  • 一個線程持有鎖會導致其它所有需要此鎖的線程掛起直至該鎖釋放。

  • 如果一個優先順序高的線程等待一個優先順序低的線程釋放鎖會導致導致優先順序反轉(Priority Inversion),引起性能風險。

4.2.2 CAS無鎖演算法

​ 實現無鎖(lock-free)的非阻塞演算法有多種實現方法,其中 CAS(比較與交換,Compare and swap) 是一種有名的無鎖演算法。CAS的語義是“我認為V的值應該為A,如果是,那麼將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是一種 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變數時,只有其中一個線程能更新變數的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。

​ 這是一個CPU級別的指令,在我的意識中,它的工作方式有點像樂觀鎖——CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。

file

註意,這可以是CPU的兩個不同的核心,但不會是兩個獨立的CPU。

​ CAS操作比鎖消耗資源少的多,因為它們不牽涉操作系統,它們直接在CPU上操作。但它們並非沒有代價——在上面的試驗中,單線程無鎖耗時300ms,單線程有鎖耗時10000ms,單線程使用CAS耗時5700ms。所以它比使用鎖耗時少,但比不需要考慮競爭的單線程耗時多。

4.2.3 傳統隊列問題

隊列的底層數據結構一般分成三種:數組、鏈表和堆。其中,堆這裡是為了實現帶有優先順序特性的隊列,暫且不考慮。

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

​ 在穩定性和性能要求特別高的系統中,為了防止生產者速度過快,導致記憶體溢出,只能選擇有界隊列;

​ 同時,為了減少Java的垃圾回收對系統性能的影響,會儘量選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue,但是ArrayBlockingQueue是通過加鎖的方式保證線程安全,而且ArrayBlockingQueue還存在偽共用問題,這兩個問題嚴重影響了性能。

4.2.3.1 Disruptor的無鎖設計

​ 多線程環境下,多個生產者通過do/while迴圈的條件CAS,來判斷每次申請的空間是否已經被其他生產者占據。假如已經被占據,該函數會返回失敗,While迴圈重新執行,申請寫入空間。

do
{
    current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
while (!cursor.compareAndSet(current, next));
//next 類比於ArrayBlockQueue的數組索引index
return next;

4.3 環形數組結構

環形數組結構是整個Disruptor的核心所在。

4.3.1 什麼是環形數組

​ RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer,RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。

file

4.3.2 為什麼使用環形數組

為了避免垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好

​ 首先因為是數組,所以要比鏈表快,而且根據我們對上面緩存行的解釋知道,數組中的一個元素載入,相鄰的數組元素也是會被預載入的,因此在這樣的結構中,cpu無需時不時去主存載入數組中的下一個元素。

​ 而且,你可以為數組預先分配記憶體,使得數組對象一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。

​ 此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的記憶體清理操作。環形數組中的元素採用覆蓋方式,避免了jvm的GC。

​ 其次結構作為環形,數組的大小為2的n次方,這樣元素定位可以通過位運算效率會更高,這個跟一致性哈希中的環形策略有點像。在disruptor中,這個牛逼的環形結構就是RingBuffer,既然是數組,那麼就有大小,而且這個大小必須是2的n次方,結構如下:

file

​ 其實質只是一個普通的數組,只是當放置數據填充滿隊列(即到達2^n-1位置)之後,再填充數據,就會從0開始,覆蓋之前的數據,於是就相當於一個環。

4.4 元素位置定位

​ 數組長度2^n,通過位運算,加快定位的速度。下標採取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

4.5 等待策略

​ 定義 Consumer 如何進行等待下一個事件的策略。 (註:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)根據實際運行環境的 CPU 的硬體特點選擇恰當的策略,並配合特定的 JVM 的配置參數,能夠實現不同的性能提升。

4.5.1 BlockingWaitStrategy

​ Disruptor的預設策略是BlockingWaitStrategy,在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒

​ BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現迴圈等待,適合用於非同步日誌類似的場景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy是可以使用在低延遲系統的策略之一,YieldingWaitStrategy將自旋以等待序列增加到適當的值。在迴圈體內,將調用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心數的場景中,推薦使用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲並不 的場景。

本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關註點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我的gRPC之旅。使用gRPC一元通信模式和雙向流通信模式寫一個簡單的控制台聊天室。實現創建用戶和實時聊天兩個功能,不考慮高性能。複習了記憶體同步訪問Sync包的使用。用切片緩存聊天記錄,新用戶可以同步聊天記錄。 ...
  • Java基礎知識 Java的三種版本 JavaSE :標準版,主要用於開發桌面程式,控制台開發等等 JavaME:嵌入式開發,主要用於開發手機,小家電等等,目前使用的比較少 JavaEE:企業級開發,主要用於web端開發,伺服器開發等等,是使用十分廣泛的,學好這部分就要學好JavaSE JDK、JR ...
  • 來源:liuchenyang0515.blog.csdn.net/article/details/109263510 對稱加密 兩邊用同一個密鑰來加解密。 A把明文通過某一演算法加密之後得到密文,然後把密文發送給B,B接收到密文之後用相同的密鑰執行相同的演算法去解密。X沒有密鑰,即使竊取到密文也無法竊聽 ...
  • 多用戶即時通訊系統01 1.項目開發流程 2.需求分析 用戶登錄 拉取線上用戶列表 無異常退出(包括客戶端和服務端) 私聊 群聊 發文件 伺服器推送新聞/廣播 3.設計階段 3.1界面設計 用戶登錄: 拉取線上用戶列表: 私聊: 群聊: 發文件: 文件伺服器推送新聞: 3.2通訊系統整體設計 總結: ...
  • 實時展示用戶上傳的頭像 總體思路 """ 1.首先需要給對應的上傳頭像input框綁定一個文本域變化事件 (當檢測到用戶對該文件框上傳了頭像就會觸發一系列操作) 2.再生成一個文件閱讀器對象 3.再獲取用戶上傳的文件頭像 4.把用戶上傳的文件頭像交給文件閱讀器對象FileReader讀取 5.利用文 ...
  • 1 垃圾收集三件事 哪些記憶體需要回收:死去的對象需要回收 什麼時候回收 如何回收 按照jvm記憶體區域劃分原則:程式計數器、虛擬機棧、本地方法棧3個區域的記憶體隨線程創建而劃分,因此線程結束時,記憶體也自動釋放。 本章節分析的是Java堆和方法區的記憶體管理策略 1、虛擬機棧、本地方法棧,棧中的棧幀隨著方法 ...
  • Python中,要想知道一個字元串有多少個字元(獲得字元串長度),或者一個字元串占用多少個位元組,可以使用len()函數。 語法格式: len(string) string 用於指定要進行長度統計的字元串 示例: a = 'www.baidu.com' print(len(a)) 輸出 13 在 Py ...
  • 二、散點圖 import seaborn as sns import matplotlib.pyplot as plt sns.set_theme(style = 'whitegrid') # 載入 diamonds 數據集 diamonds = sns.load_dataset('diamonds ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...