Disruptor測試結果運算1億次,耗時5503ms,吞吐量18171000/s,於是我扒開了Disruptor高性能的外衣

来源:https://www.cnblogs.com/jiagooushi/archive/2022/09/20/16711199.html
-Advertisement-
Play Games

能對比測試 為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。 測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現: Pr ...


能對比測試

為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。

測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現:

  1. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  2. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 Disruptor 傳遞給 Consumer 進行處理;

3.1 代碼實現

3.1.1 計算代碼

進行CAS累加運算

public class CommonUtils {
    private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {
        count.incrementAndGet();
    }

    public static long get() {
        return count.get();
    }
}
3.1.2 抽象類

進行一億次 CAS運算計算耗時

/**
 * 抽象類
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    //線程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    //一億次測試
    public static long tasksize = 100000000;


    /**
     * 開始調用測試
     */
    public void invok() {
        //計算當前事件
        long currentTime = System.currentTimeMillis();
        //獲取到監聽器
        Runnable monitor = monitor();
        if (null != monitor) {
            executor.execute(monitor);
        }
        //啟動
        start();

        //執行任務發佈
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {
            runnable.run();
        }

        //停止任務
        stop();
        //等待任務發佈完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //獲取處理結果
        T result = getResult();
        //計算耗時
        long duration = System.currentTimeMillis() - currentTime;
        //計算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 獲取監聽器
     *
     * @return
     */
    public Runnable monitor() {
        return null;
    }

    /**
     * 啟動任務
     */
    public void start() {

    }

    /**
     * 完成任務
     */
    public void complete() {
        countDownLatch.countDown();
    }

    /**
     * 停止任務
     */
    public void stop() {

    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 獲取運行結果
     *
     * @return
     */
    public abstract T getResult();
}

3.1.3 Disruptor性能測試代碼
public class DisruptorTest extends AbstractTask<Long> {
    //定義隨機數生成器
    private static final Random r = new Random();
    //定義Disruptor對象
    private Disruptor disruptor = null;
    //定義Disruptor事件發佈對象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 啟動
     */
    @Override
    public void start() {
        //定義事件工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必須是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        //構建disruptor對象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        //定義事件處理類
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        //配置事件處理類
        disruptor.handleEventsWith(eventHandler);
        //啟動disruptor
        disruptor.start();
        //創建事件發佈對象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 停止任務
     */
    @Override
    public void stop() {
        disruptor.shutdown();
        System.out.println("運算結果:" + CommonUtils.get());
        //完成任務
        complete();
    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    @Override
    public Runnable getTask() {
        return () -> {
            publishEvent();
        };
    }

    /**
     * 獲取運行結果
     *
     * @return
     */
    @Override
    public Long getResult() {
        return CommonUtils.get();
    }


    /**
     * 發佈對象
     */
    private void publishEvent() {
        //獲取要通過事件傳遞的業務數據
        Long data = r.nextLong();
        // 發佈事件
        translator.onData(data);
    }


    public static void main(String[] args) {
        DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();
    }

}

輸出結果

10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能測試代碼
public class ArrayBlockingQueueTest extends AbstractTask {
    private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {
        return () -> {
            try {
                for (int i = 0; i < tasksize; i++) {
                    //獲取一個元素
                    queue.take();
                    //執行計算
                    CommonUtils.calculation();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            complete();
        };
    }

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();
    }

    @Override
    public Runnable getTask() {
        return () -> {
            publish();
        };
    }

    @Override
    public Object getResult() {
        return CommonUtils.get();
    }

    public void publish() {
        Long data = r.nextLong();
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結果

10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 測試對比

測試類 運算次數 耗時(ms) 吞吐量/s
ArrayBlockingQueue 1億次 16148 6192000
Disruptor 1億次 5503 18171000

3.3 Disruptor官方性能測試

Disruptor論文中講述了一個實驗:

  • 這個測試程式調用了一個函數,該函數會對一個64位的計數器迴圈自增5億次。
  • 機器環境:2.4G 6核
  • 運算: 64位的計數器累加5億次
Method Time (ms)
單線程 300
單線程使用 CAS 5,700
單線程使用鎖 10,000
單線程使用volatile 4,700
多線程使用 CAS 30,000
多線程使用鎖 224,000

4. 高性能原理

  • 引入環形的數組結構:數組元素不會被回收,避免頻繁的GC,
  • 無鎖的設計:採用CAS無鎖方式,保證線程的安全性
  • 屬性填充:通過添加額外的無用信息,避免偽共用問題
  • 元素位置的定位:採用跟一致性哈希一樣的方式,一個索引,進行自增

4.1 偽共用概念

4.1.1 電腦緩存構成

​ 下圖是計算的基本結構。L1、L2、L3分別表示一級緩存、二級緩存、三級緩存,越靠近CPU的緩存,速度越快,容量也越小,所以L1緩存很小但很快,並且緊靠著在使用它的CPU內核;L2大一些,也慢一些,並且仍然只能被一個單獨的CPU核使用;L3更大、更慢,並且被單個插槽上的所有CPU核共用;最後是主存,由全部插槽上的所有CPU核共用。

file

​ 當CPU要讀取一個數據時,首先從一級緩存中查找,如果沒有找到再從二級緩存中查找,如果還是沒有就從三級緩存或記憶體中查找。一般來說,每級緩存的命中率大概都在80%左右,也就是說全部數據量的80%都可以在一級緩存中找到,只剩下20%的總數據量才需要從二級緩存、三級緩存或記憶體中讀取,由此可見一級緩存是整個CPU緩存架構中最為重要的部分。

file

下表是一些緩存未命中的消耗數據:

從CPU到 大約需要的CPU周期 大約需要的時間
主存 約60-80ns
QPI匯流排 約20ns
L3 cache 約40-45cycles 約15ns
L2 cache 約10cycles 約3ns
L1 cache 約3-4cycles 約1ns
寄存器 1cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

4.1.2 什麼是緩存行

​ 為瞭解決電腦系統中主記憶體與 CPU 之間運行速度差問題,會在 CPU 與主記憶體之間 添加一級或者多級高速緩衝存儲器( Cache)。這個 Cache 一般是被集成到 CPU 內部的, 所以也叫 CPU Cache,如圖所示是兩級 Cache 結構。

file

​ Cache內部是按行存儲的,其中每一行稱為一個cache line,由很多個 Cache line 組成的,Cache line 是 cache 和 RAM 交換數據的最小單位,cache行的大小一般為2的冪次數位元組,通常為 64 Byte。Cache line是Cache與主記憶體進行數據交換的單位。

file

​ 當 CPU 把記憶體的數據載入 cache 時,會把臨近的共 64 Byte 的數據一同放入同一個Cache line,因為空間局部性:臨近的數據在將來被訪問的可能性大。

linux 查看緩存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什麼是共用

​ CPU緩存是以緩存行(cache line)為單位存儲的。緩存行通常是 64 位元組,並且它有效地引用主記憶體中的一塊地址。一個 Java 的 long 類型是 8 位元組,因此在一個緩存行中可以存 8 個 long 類型的變數。所以,如果你訪問一個 long 數組,當數組中的一個值被載入到緩存中,它會額外載入另外 7 個,以致你能非常快地遍歷這個數組。事實上,你可以非常快速的遍歷在連續的記憶體塊中分配的任意數據結構。而如果你在數據結構中的項在記憶體中不是彼此相鄰的(如鏈表),你將得不到免費緩存載入所帶來的優勢,並且在這些數據結構中的每一個項都可能會出現緩存未命中。下圖是一個CPU緩存行的示意圖:

file

​ 錶面上 X 和 Y 都是被獨立線程操作的,而且兩操作之間也沒有任何關係。只不過它們共用了一個緩存行,但所有競爭衝突都是來源於共用。

4.1.4 什麼是偽共用

​ 當CPU訪問某一個變數時候,首先會去看CPU Cache內是否有該變數,如果有則直接從中獲取,否者就去主記憶體裡面獲取該變數,然後把該變數所在記憶體區域的一個Cache行大小的記憶體拷貝到Cache(cache行是Cache與主記憶體進行數據交換的單位)。

​ 由於存放到Cache行的的是記憶體塊而不是單個變數,所以可能會把多個變數存放到了一個cache行。當多個線程同時修改一個緩存行裡面的多個變數時候,由於同時只能有一個線程操作緩存行,所以相比每個變數放到一個緩存行性能會有所下降,這就是偽共用。

file

​ 如上圖變數x,y同時被放到了CPU的一級和二級緩存,當線程1使用CPU1對變數x進行更新時候,首先會修改cpu1的一級緩存變數x所在緩存行,這時候緩存一致性協議會導致cpu2中變數x對應的緩存行失效,那麼線程2寫入變數x的時候就只能去二級緩存去查找,這就破壞了一級緩存,而一級緩存比二級緩存更快。更壞的情況下如果cpu只有一級緩存,那麼會導致頻繁的直接訪問主記憶體。

​ 我們的緩存都是以緩存行作為一個單位來處理的,所以失效x的緩存的同時,也會把y失效,反之亦然。

4.1.5 為何會出現偽共用

​ 偽共用的產生是因為多個變數被放入了一個緩存行,並且多個線程同時去寫入緩存行中不同變數。那麼為何多個變數會被放入一個緩存行那。其實是因為Cache與記憶體交換數據的單位就是Cache line,當CPU要訪問的變數沒有在Cache命中時候,根據程式運行的局部性原理會把該變數在記憶體中大小為Cache行的記憶體放如緩存行。

long a;
long b;
long c;
long d;

​ 如上代碼,聲明瞭四個long變數,假設cache line的大小為32個位元組,那麼當cpu訪問變數a時候發現該變數沒有在cache命中,那麼就會去主記憶體把變數a以及記憶體地址附近的b,c,d放入緩存行。也就是地址連續的多個變數才有可能會被放到一個緩存行中,當創建數組時候,數組裡面的多個元素就會被放入到同一個緩存行。那麼單線程下多個變數放入緩存行對性能有影響?其實正常情況下單線程訪問時候由於數組元素被放入到了一個或者多個cache行對代碼執行是有利的,因為數據都在緩存中,代碼執行會更快。

4.1.6 如何解偽共用

​ 解決偽共用最直接的方法就是填充(padding),例如下麵的VolatileLong,一個long占8個位元組,Java的對象頭占用8個位元組(32位系統)或者12位元組(64位系統,預設開啟對象頭壓縮,不開啟占16位元組)。一個緩存行64位元組,那麼我們可以填充6個long(6 * 8 = 48 個位元組)。

4.1.6.1 不使用欄位填充
public class VolatileData {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.6.1.2 填充欄位

因為JDK1.7以後就自動優化代碼會刪除無用的代碼,在JDK1.7以後的版本這些不生效了。

/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5; //jvm 優化 刪除無用代碼
    //需要操作的數據
    volatile long value;
}

記憶體佈局

file

4.1.6.3 繼承的方式
/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5;
}

繼承緩存填充類

/**
 * 繼承DataPadding
 */
public class VolatileData extends DataPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.4 Disruptor填充方式
class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

繼承填充類

public class VolatileData extends RhsPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.5 @Contended註解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
    String value() default "";
}

註解填充類

@Contended
public class VolatileData  {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;
    
    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

註意事項

​ 在Java8中提供了@sun.misc.Contended來避免偽共用時,在運行時需要設置JVM啟動參數-XX:-RestrictContended否則可能不生效。

4.1.7 性能對比
4.1.7.1 測試代碼

使用和不使用緩存行填充的對比

/**
 * 緩存行測試
 */
public class CacheLineTest {
    /**
     * 通過緩存行填充的變數
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 迴圈次數
     */
    private final long size = 100000000;

    /**
     * 進行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        //計算耗時
        long currentTime = System.currentTimeMillis();
        long value = 0;
        //迴圈累加
        for (int i = 0; i < size; i++) {
            //使用緩存行填充的方式
            value = volatileData.accumulationAdd();


        }
        //列印
        System.out.println(value);
        //列印耗時
        System.out.println("耗時:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        //創建對象
        CacheLineTest cacheRowTest = new CacheLineTest();
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //啟動三個線程個調用他們各自的方法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();
    }
}
4.1.7.2 測試數據

同樣的結構他們之間差了 將近 50倍的速度差距

對象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor解決偽共用

​ 在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩衝區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩衝區中,Sequence標示著寫入進度,例如每次生產者要寫入數據進緩衝區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示著它們的事件序號,來看看Sequence類的源碼:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	


    public Sequence() {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 從第1到11行可以看到,真正使用到的變數value,它的前後空間都由8個long型的變數填補了,對於一個大小為64位元組的緩存行,它剛好被填補滿(一個long型變數value,8個位元組加上前/後個7long型變數填補,7*8=56,56+8=64位元組)。這樣做每次把變數value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個位元組的緩存行來說,如果緩存行大小大於64個位元組,那麼還是會出現偽共用問題),保證每次處理數據時都不會與其他變數發生衝突。

4.2 無鎖的設計

4.2.1 鎖機制存在的問題
  • 在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題,而且在上下文切換的時候,cpu之前緩存的指令和數據都將失效,對性能有很大的損失,用戶態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

  • 一個線程持有鎖會導致其它所有需要此鎖的線程掛起直至該鎖釋放。

  • 如果一個優先順序高的線程等待一個優先順序低的線程釋放鎖會導致導致優先順序反轉(Priority Inversion),引起性能風險。

4.2.2 CAS無鎖演算法

​ 實現無鎖(lock-free)的非阻塞演算法有多種實現方法,其中 CAS(比較與交換,Compare and swap) 是一種有名的無鎖演算法。CAS的語義是“我認為V的值應該為A,如果是,那麼將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是一種 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變數時,只有其中一個線程能更新變數的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。

​ 這是一個CPU級別的指令,在我的意識中,它的工作方式有點像樂觀鎖——CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。

file

註意,這可以是CPU的兩個不同的核心,但不會是兩個獨立的CPU。

​ CAS操作比鎖消耗資源少的多,因為它們不牽涉操作系統,它們直接在CPU上操作。但它們並非沒有代價——在上面的試驗中,單線程無鎖耗時300ms,單線程有鎖耗時10000ms,單線程使用CAS耗時5700ms。所以它比使用鎖耗時少,但比不需要考慮競爭的單線程耗時多。

4.2.3 傳統隊列問題

隊列的底層數據結構一般分成三種:數組、鏈表和堆。其中,堆這裡是為了實現帶有優先順序特性的隊列,暫且不考慮。

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

​ 在穩定性和性能要求特別高的系統中,為了防止生產者速度過快,導致記憶體溢出,只能選擇有界隊列;

​ 同時,為了減少Java的垃圾回收對系統性能的影響,會儘量選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue,但是ArrayBlockingQueue是通過加鎖的方式保證線程安全,而且ArrayBlockingQueue還存在偽共用問題,這兩個問題嚴重影響了性能。

4.2.3.1 Disruptor的無鎖設計

​ 多線程環境下,多個生產者通過do/while迴圈的條件CAS,來判斷每次申請的空間是否已經被其他生產者占據。假如已經被占據,該函數會返回失敗,While迴圈重新執行,申請寫入空間。

do
{
    current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
while (!cursor.compareAndSet(current, next));
//next 類比於ArrayBlockQueue的數組索引index
return next;

4.3 環形數組結構

環形數組結構是整個Disruptor的核心所在。

4.3.1 什麼是環形數組

​ RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer,RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。

file

4.3.2 為什麼使用環形數組

為了避免垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好

​ 首先因為是數組,所以要比鏈表快,而且根據我們對上面緩存行的解釋知道,數組中的一個元素載入,相鄰的數組元素也是會被預載入的,因此在這樣的結構中,cpu無需時不時去主存載入數組中的下一個元素。

​ 而且,你可以為數組預先分配記憶體,使得數組對象一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。

​ 此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的記憶體清理操作。環形數組中的元素採用覆蓋方式,避免了jvm的GC。

​ 其次結構作為環形,數組的大小為2的n次方,這樣元素定位可以通過位運算效率會更高,這個跟一致性哈希中的環形策略有點像。在disruptor中,這個牛逼的環形結構就是RingBuffer,既然是數組,那麼就有大小,而且這個大小必須是2的n次方,結構如下:

file

​ 其實質只是一個普通的數組,只是當放置數據填充滿隊列(即到達2^n-1位置)之後,再填充數據,就會從0開始,覆蓋之前的數據,於是就相當於一個環。

4.4 元素位置定位

​ 數組長度2^n,通過位運算,加快定位的速度。下標採取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

4.5 等待策略

​ 定義 Consumer 如何進行等待下一個事件的策略。 (註:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)根據實際運行環境的 CPU 的硬體特點選擇恰當的策略,並配合特定的 JVM 的配置參數,能夠實現不同的性能提升。

4.5.1 BlockingWaitStrategy

​ Disruptor的預設策略是BlockingWaitStrategy,在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒

​ BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現迴圈等待,適合用於非同步日誌類似的場景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy是可以使用在低延遲系統的策略之一,YieldingWaitStrategy將自旋以等待序列增加到適當的值。在迴圈體內,將調用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心數的場景中,推薦使用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲並不 的場景。

本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關註點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我的gRPC之旅。使用gRPC一元通信模式和雙向流通信模式寫一個簡單的控制台聊天室。實現創建用戶和實時聊天兩個功能,不考慮高性能。複習了記憶體同步訪問Sync包的使用。用切片緩存聊天記錄,新用戶可以同步聊天記錄。 ...
  • Java基礎知識 Java的三種版本 JavaSE :標準版,主要用於開發桌面程式,控制台開發等等 JavaME:嵌入式開發,主要用於開發手機,小家電等等,目前使用的比較少 JavaEE:企業級開發,主要用於web端開發,伺服器開發等等,是使用十分廣泛的,學好這部分就要學好JavaSE JDK、JR ...
  • 來源:liuchenyang0515.blog.csdn.net/article/details/109263510 對稱加密 兩邊用同一個密鑰來加解密。 A把明文通過某一演算法加密之後得到密文,然後把密文發送給B,B接收到密文之後用相同的密鑰執行相同的演算法去解密。X沒有密鑰,即使竊取到密文也無法竊聽 ...
  • 多用戶即時通訊系統01 1.項目開發流程 2.需求分析 用戶登錄 拉取線上用戶列表 無異常退出(包括客戶端和服務端) 私聊 群聊 發文件 伺服器推送新聞/廣播 3.設計階段 3.1界面設計 用戶登錄: 拉取線上用戶列表: 私聊: 群聊: 發文件: 文件伺服器推送新聞: 3.2通訊系統整體設計 總結: ...
  • 實時展示用戶上傳的頭像 總體思路 """ 1.首先需要給對應的上傳頭像input框綁定一個文本域變化事件 (當檢測到用戶對該文件框上傳了頭像就會觸發一系列操作) 2.再生成一個文件閱讀器對象 3.再獲取用戶上傳的文件頭像 4.把用戶上傳的文件頭像交給文件閱讀器對象FileReader讀取 5.利用文 ...
  • 1 垃圾收集三件事 哪些記憶體需要回收:死去的對象需要回收 什麼時候回收 如何回收 按照jvm記憶體區域劃分原則:程式計數器、虛擬機棧、本地方法棧3個區域的記憶體隨線程創建而劃分,因此線程結束時,記憶體也自動釋放。 本章節分析的是Java堆和方法區的記憶體管理策略 1、虛擬機棧、本地方法棧,棧中的棧幀隨著方法 ...
  • Python中,要想知道一個字元串有多少個字元(獲得字元串長度),或者一個字元串占用多少個位元組,可以使用len()函數。 語法格式: len(string) string 用於指定要進行長度統計的字元串 示例: a = 'www.baidu.com' print(len(a)) 輸出 13 在 Py ...
  • 二、散點圖 import seaborn as sns import matplotlib.pyplot as plt sns.set_theme(style = 'whitegrid') # 載入 diamonds 數據集 diamonds = sns.load_dataset('diamonds ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...