Disruptor測試結果運算1億次,耗時5503ms,吞吐量18171000/s,於是我扒開了Disruptor高性能的外衣

来源:https://www.cnblogs.com/jiagooushi/archive/2022/09/20/16711199.html
-Advertisement-
Play Games

能對比測試 為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。 測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現: Pr ...


能對比測試

為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。

測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現:

  1. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  2. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 Disruptor 傳遞給 Consumer 進行處理;

3.1 代碼實現

3.1.1 計算代碼

進行CAS累加運算

public class CommonUtils {
    private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {
        count.incrementAndGet();
    }

    public static long get() {
        return count.get();
    }
}
3.1.2 抽象類

進行一億次 CAS運算計算耗時

/**
 * 抽象類
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    //線程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    //一億次測試
    public static long tasksize = 100000000;


    /**
     * 開始調用測試
     */
    public void invok() {
        //計算當前事件
        long currentTime = System.currentTimeMillis();
        //獲取到監聽器
        Runnable monitor = monitor();
        if (null != monitor) {
            executor.execute(monitor);
        }
        //啟動
        start();

        //執行任務發佈
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {
            runnable.run();
        }

        //停止任務
        stop();
        //等待任務發佈完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //獲取處理結果
        T result = getResult();
        //計算耗時
        long duration = System.currentTimeMillis() - currentTime;
        //計算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 獲取監聽器
     *
     * @return
     */
    public Runnable monitor() {
        return null;
    }

    /**
     * 啟動任務
     */
    public void start() {

    }

    /**
     * 完成任務
     */
    public void complete() {
        countDownLatch.countDown();
    }

    /**
     * 停止任務
     */
    public void stop() {

    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 獲取運行結果
     *
     * @return
     */
    public abstract T getResult();
}

3.1.3 Disruptor性能測試代碼
public class DisruptorTest extends AbstractTask<Long> {
    //定義隨機數生成器
    private static final Random r = new Random();
    //定義Disruptor對象
    private Disruptor disruptor = null;
    //定義Disruptor事件發佈對象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 啟動
     */
    @Override
    public void start() {
        //定義事件工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必須是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        //構建disruptor對象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        //定義事件處理類
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        //配置事件處理類
        disruptor.handleEventsWith(eventHandler);
        //啟動disruptor
        disruptor.start();
        //創建事件發佈對象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 停止任務
     */
    @Override
    public void stop() {
        disruptor.shutdown();
        System.out.println("運算結果:" + CommonUtils.get());
        //完成任務
        complete();
    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    @Override
    public Runnable getTask() {
        return () -> {
            publishEvent();
        };
    }

    /**
     * 獲取運行結果
     *
     * @return
     */
    @Override
    public Long getResult() {
        return CommonUtils.get();
    }


    /**
     * 發佈對象
     */
    private void publishEvent() {
        //獲取要通過事件傳遞的業務數據
        Long data = r.nextLong();
        // 發佈事件
        translator.onData(data);
    }


    public static void main(String[] args) {
        DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();
    }

}

輸出結果

10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能測試代碼
public class ArrayBlockingQueueTest extends AbstractTask {
    private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {
        return () -> {
            try {
                for (int i = 0; i < tasksize; i++) {
                    //獲取一個元素
                    queue.take();
                    //執行計算
                    CommonUtils.calculation();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            complete();
        };
    }

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();
    }

    @Override
    public Runnable getTask() {
        return () -> {
            publish();
        };
    }

    @Override
    public Object getResult() {
        return CommonUtils.get();
    }

    public void publish() {
        Long data = r.nextLong();
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結果

10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 測試對比

測試類 運算次數 耗時(ms) 吞吐量/s
ArrayBlockingQueue 1億次 16148 6192000
Disruptor 1億次 5503 18171000

3.3 Disruptor官方性能測試

Disruptor論文中講述了一個實驗:

  • 這個測試程式調用了一個函數,該函數會對一個64位的計數器迴圈自增5億次。
  • 機器環境:2.4G 6核
  • 運算: 64位的計數器累加5億次
Method Time (ms)
單線程 300
單線程使用 CAS 5,700
單線程使用鎖 10,000
單線程使用volatile 4,700
多線程使用 CAS 30,000
多線程使用鎖 224,000

4. 高性能原理

  • 引入環形的數組結構:數組元素不會被回收,避免頻繁的GC,
  • 無鎖的設計:採用CAS無鎖方式,保證線程的安全性
  • 屬性填充:通過添加額外的無用信息,避免偽共用問題
  • 元素位置的定位:採用跟一致性哈希一樣的方式,一個索引,進行自增

4.1 偽共用概念

4.1.1 電腦緩存構成

​ 下圖是計算的基本結構。L1、L2、L3分別表示一級緩存、二級緩存、三級緩存,越靠近CPU的緩存,速度越快,容量也越小,所以L1緩存很小但很快,並且緊靠著在使用它的CPU內核;L2大一些,也慢一些,並且仍然只能被一個單獨的CPU核使用;L3更大、更慢,並且被單個插槽上的所有CPU核共用;最後是主存,由全部插槽上的所有CPU核共用。

file

​ 當CPU要讀取一個數據時,首先從一級緩存中查找,如果沒有找到再從二級緩存中查找,如果還是沒有就從三級緩存或記憶體中查找。一般來說,每級緩存的命中率大概都在80%左右,也就是說全部數據量的80%都可以在一級緩存中找到,只剩下20%的總數據量才需要從二級緩存、三級緩存或記憶體中讀取,由此可見一級緩存是整個CPU緩存架構中最為重要的部分。

file

下表是一些緩存未命中的消耗數據:

從CPU到 大約需要的CPU周期 大約需要的時間
主存 約60-80ns
QPI匯流排 約20ns
L3 cache 約40-45cycles 約15ns
L2 cache 約10cycles 約3ns
L1 cache 約3-4cycles 約1ns
寄存器 1cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

4.1.2 什麼是緩存行

​ 為瞭解決電腦系統中主記憶體與 CPU 之間運行速度差問題,會在 CPU 與主記憶體之間 添加一級或者多級高速緩衝存儲器( Cache)。這個 Cache 一般是被集成到 CPU 內部的, 所以也叫 CPU Cache,如圖所示是兩級 Cache 結構。

file

​ Cache內部是按行存儲的,其中每一行稱為一個cache line,由很多個 Cache line 組成的,Cache line 是 cache 和 RAM 交換數據的最小單位,cache行的大小一般為2的冪次數位元組,通常為 64 Byte。Cache line是Cache與主記憶體進行數據交換的單位。

file

​ 當 CPU 把記憶體的數據載入 cache 時,會把臨近的共 64 Byte 的數據一同放入同一個Cache line,因為空間局部性:臨近的數據在將來被訪問的可能性大。

linux 查看緩存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什麼是共用

​ CPU緩存是以緩存行(cache line)為單位存儲的。緩存行通常是 64 位元組,並且它有效地引用主記憶體中的一塊地址。一個 Java 的 long 類型是 8 位元組,因此在一個緩存行中可以存 8 個 long 類型的變數。所以,如果你訪問一個 long 數組,當數組中的一個值被載入到緩存中,它會額外載入另外 7 個,以致你能非常快地遍歷這個數組。事實上,你可以非常快速的遍歷在連續的記憶體塊中分配的任意數據結構。而如果你在數據結構中的項在記憶體中不是彼此相鄰的(如鏈表),你將得不到免費緩存載入所帶來的優勢,並且在這些數據結構中的每一個項都可能會出現緩存未命中。下圖是一個CPU緩存行的示意圖:

file

​ 錶面上 X 和 Y 都是被獨立線程操作的,而且兩操作之間也沒有任何關係。只不過它們共用了一個緩存行,但所有競爭衝突都是來源於共用。

4.1.4 什麼是偽共用

​ 當CPU訪問某一個變數時候,首先會去看CPU Cache內是否有該變數,如果有則直接從中獲取,否者就去主記憶體裡面獲取該變數,然後把該變數所在記憶體區域的一個Cache行大小的記憶體拷貝到Cache(cache行是Cache與主記憶體進行數據交換的單位)。

​ 由於存放到Cache行的的是記憶體塊而不是單個變數,所以可能會把多個變數存放到了一個cache行。當多個線程同時修改一個緩存行裡面的多個變數時候,由於同時只能有一個線程操作緩存行,所以相比每個變數放到一個緩存行性能會有所下降,這就是偽共用。

file

​ 如上圖變數x,y同時被放到了CPU的一級和二級緩存,當線程1使用CPU1對變數x進行更新時候,首先會修改cpu1的一級緩存變數x所在緩存行,這時候緩存一致性協議會導致cpu2中變數x對應的緩存行失效,那麼線程2寫入變數x的時候就只能去二級緩存去查找,這就破壞了一級緩存,而一級緩存比二級緩存更快。更壞的情況下如果cpu只有一級緩存,那麼會導致頻繁的直接訪問主記憶體。

​ 我們的緩存都是以緩存行作為一個單位來處理的,所以失效x的緩存的同時,也會把y失效,反之亦然。

4.1.5 為何會出現偽共用

​ 偽共用的產生是因為多個變數被放入了一個緩存行,並且多個線程同時去寫入緩存行中不同變數。那麼為何多個變數會被放入一個緩存行那。其實是因為Cache與記憶體交換數據的單位就是Cache line,當CPU要訪問的變數沒有在Cache命中時候,根據程式運行的局部性原理會把該變數在記憶體中大小為Cache行的記憶體放如緩存行。

long a;
long b;
long c;
long d;

​ 如上代碼,聲明瞭四個long變數,假設cache line的大小為32個位元組,那麼當cpu訪問變數a時候發現該變數沒有在cache命中,那麼就會去主記憶體把變數a以及記憶體地址附近的b,c,d放入緩存行。也就是地址連續的多個變數才有可能會被放到一個緩存行中,當創建數組時候,數組裡面的多個元素就會被放入到同一個緩存行。那麼單線程下多個變數放入緩存行對性能有影響?其實正常情況下單線程訪問時候由於數組元素被放入到了一個或者多個cache行對代碼執行是有利的,因為數據都在緩存中,代碼執行會更快。

4.1.6 如何解偽共用

​ 解決偽共用最直接的方法就是填充(padding),例如下麵的VolatileLong,一個long占8個位元組,Java的對象頭占用8個位元組(32位系統)或者12位元組(64位系統,預設開啟對象頭壓縮,不開啟占16位元組)。一個緩存行64位元組,那麼我們可以填充6個long(6 * 8 = 48 個位元組)。

4.1.6.1 不使用欄位填充
public class VolatileData {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.6.1.2 填充欄位

因為JDK1.7以後就自動優化代碼會刪除無用的代碼,在JDK1.7以後的版本這些不生效了。

/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5; //jvm 優化 刪除無用代碼
    //需要操作的數據
    volatile long value;
}

記憶體佈局

file

4.1.6.3 繼承的方式
/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型欄位 8*5 = 40 個位元組
    private long p1, p2, p3, p4, p5;
}

繼承緩存填充類

/**
 * 繼承DataPadding
 */
public class VolatileData extends DataPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.4 Disruptor填充方式
class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

繼承填充類

public class VolatileData extends RhsPadding {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

4.1.6.5 @Contended註解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
    String value() default "";
}

註解填充類

@Contended
public class VolatileData  {
    // 占用 8個位元組 +48 + 對象頭 = 64位元組

    //需要操作的數據
    volatile long value;
    
    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

記憶體佈局

file

註意事項

​ 在Java8中提供了@sun.misc.Contended來避免偽共用時,在運行時需要設置JVM啟動參數-XX:-RestrictContended否則可能不生效。

4.1.7 性能對比
4.1.7.1 測試代碼

使用和不使用緩存行填充的對比

/**
 * 緩存行測試
 */
public class CacheLineTest {
    /**
     * 通過緩存行填充的變數
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 迴圈次數
     */
    private final long size = 100000000;

    /**
     * 進行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        //計算耗時
        long currentTime = System.currentTimeMillis();
        long value = 0;
        //迴圈累加
        for (int i = 0; i < size; i++) {
            //使用緩存行填充的方式
            value = volatileData.accumulationAdd();


        }
        //列印
        System.out.println(value);
        //列印耗時
        System.out.println("耗時:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        //創建對象
        CacheLineTest cacheRowTest = new CacheLineTest();
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //啟動三個線程個調用他們各自的方法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();
    }
}
4.1.7.2 測試數據

同樣的結構他們之間差了 將近 50倍的速度差距

對象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor解決偽共用

​ 在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩衝區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩衝區中,Sequence標示著寫入進度,例如每次生產者要寫入數據進緩衝區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示著它們的事件序號,來看看Sequence類的源碼:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	


    public Sequence() {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 從第1到11行可以看到,真正使用到的變數value,它的前後空間都由8個long型的變數填補了,對於一個大小為64位元組的緩存行,它剛好被填補滿(一個long型變數value,8個位元組加上前/後個7long型變數填補,7*8=56,56+8=64位元組)。這樣做每次把變數value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個位元組的緩存行來說,如果緩存行大小大於64個位元組,那麼還是會出現偽共用問題),保證每次處理數據時都不會與其他變數發生衝突。

4.2 無鎖的設計

4.2.1 鎖機制存在的問題
  • 在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題,而且在上下文切換的時候,cpu之前緩存的指令和數據都將失效,對性能有很大的損失,用戶態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

  • 一個線程持有鎖會導致其它所有需要此鎖的線程掛起直至該鎖釋放。

  • 如果一個優先順序高的線程等待一個優先順序低的線程釋放鎖會導致導致優先順序反轉(Priority Inversion),引起性能風險。

4.2.2 CAS無鎖演算法

​ 實現無鎖(lock-free)的非阻塞演算法有多種實現方法,其中 CAS(比較與交換,Compare and swap) 是一種有名的無鎖演算法。CAS的語義是“我認為V的值應該為A,如果是,那麼將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是一種 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變數時,只有其中一個線程能更新變數的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。

​ 這是一個CPU級別的指令,在我的意識中,它的工作方式有點像樂觀鎖——CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。

file

註意,這可以是CPU的兩個不同的核心,但不會是兩個獨立的CPU。

​ CAS操作比鎖消耗資源少的多,因為它們不牽涉操作系統,它們直接在CPU上操作。但它們並非沒有代價——在上面的試驗中,單線程無鎖耗時300ms,單線程有鎖耗時10000ms,單線程使用CAS耗時5700ms。所以它比使用鎖耗時少,但比不需要考慮競爭的單線程耗時多。

4.2.3 傳統隊列問題

隊列的底層數據結構一般分成三種:數組、鏈表和堆。其中,堆這裡是為了實現帶有優先順序特性的隊列,暫且不考慮。

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

​ 在穩定性和性能要求特別高的系統中,為了防止生產者速度過快,導致記憶體溢出,只能選擇有界隊列;

​ 同時,為了減少Java的垃圾回收對系統性能的影響,會儘量選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue,但是ArrayBlockingQueue是通過加鎖的方式保證線程安全,而且ArrayBlockingQueue還存在偽共用問題,這兩個問題嚴重影響了性能。

4.2.3.1 Disruptor的無鎖設計

​ 多線程環境下,多個生產者通過do/while迴圈的條件CAS,來判斷每次申請的空間是否已經被其他生產者占據。假如已經被占據,該函數會返回失敗,While迴圈重新執行,申請寫入空間。

do
{
    current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
while (!cursor.compareAndSet(current, next));
//next 類比於ArrayBlockQueue的數組索引index
return next;

4.3 環形數組結構

環形數組結構是整個Disruptor的核心所在。

4.3.1 什麼是環形數組

​ RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer,RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。

file

4.3.2 為什麼使用環形數組

為了避免垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好

​ 首先因為是數組,所以要比鏈表快,而且根據我們對上面緩存行的解釋知道,數組中的一個元素載入,相鄰的數組元素也是會被預載入的,因此在這樣的結構中,cpu無需時不時去主存載入數組中的下一個元素。

​ 而且,你可以為數組預先分配記憶體,使得數組對象一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。

​ 此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的記憶體清理操作。環形數組中的元素採用覆蓋方式,避免了jvm的GC。

​ 其次結構作為環形,數組的大小為2的n次方,這樣元素定位可以通過位運算效率會更高,這個跟一致性哈希中的環形策略有點像。在disruptor中,這個牛逼的環形結構就是RingBuffer,既然是數組,那麼就有大小,而且這個大小必須是2的n次方,結構如下:

file

​ 其實質只是一個普通的數組,只是當放置數據填充滿隊列(即到達2^n-1位置)之後,再填充數據,就會從0開始,覆蓋之前的數據,於是就相當於一個環。

4.4 元素位置定位

​ 數組長度2^n,通過位運算,加快定位的速度。下標採取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

4.5 等待策略

​ 定義 Consumer 如何進行等待下一個事件的策略。 (註:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)根據實際運行環境的 CPU 的硬體特點選擇恰當的策略,並配合特定的 JVM 的配置參數,能夠實現不同的性能提升。

4.5.1 BlockingWaitStrategy

​ Disruptor的預設策略是BlockingWaitStrategy,在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒

​ BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現迴圈等待,適合用於非同步日誌類似的場景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy是可以使用在低延遲系統的策略之一,YieldingWaitStrategy將自旋以等待序列增加到適當的值。在迴圈體內,將調用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心數的場景中,推薦使用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲並不 的場景。

本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關註點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我的gRPC之旅。使用gRPC一元通信模式和雙向流通信模式寫一個簡單的控制台聊天室。實現創建用戶和實時聊天兩個功能,不考慮高性能。複習了記憶體同步訪問Sync包的使用。用切片緩存聊天記錄,新用戶可以同步聊天記錄。 ...
  • Java基礎知識 Java的三種版本 JavaSE :標準版,主要用於開發桌面程式,控制台開發等等 JavaME:嵌入式開發,主要用於開發手機,小家電等等,目前使用的比較少 JavaEE:企業級開發,主要用於web端開發,伺服器開發等等,是使用十分廣泛的,學好這部分就要學好JavaSE JDK、JR ...
  • 來源:liuchenyang0515.blog.csdn.net/article/details/109263510 對稱加密 兩邊用同一個密鑰來加解密。 A把明文通過某一演算法加密之後得到密文,然後把密文發送給B,B接收到密文之後用相同的密鑰執行相同的演算法去解密。X沒有密鑰,即使竊取到密文也無法竊聽 ...
  • 多用戶即時通訊系統01 1.項目開發流程 2.需求分析 用戶登錄 拉取線上用戶列表 無異常退出(包括客戶端和服務端) 私聊 群聊 發文件 伺服器推送新聞/廣播 3.設計階段 3.1界面設計 用戶登錄: 拉取線上用戶列表: 私聊: 群聊: 發文件: 文件伺服器推送新聞: 3.2通訊系統整體設計 總結: ...
  • 實時展示用戶上傳的頭像 總體思路 """ 1.首先需要給對應的上傳頭像input框綁定一個文本域變化事件 (當檢測到用戶對該文件框上傳了頭像就會觸發一系列操作) 2.再生成一個文件閱讀器對象 3.再獲取用戶上傳的文件頭像 4.把用戶上傳的文件頭像交給文件閱讀器對象FileReader讀取 5.利用文 ...
  • 1 垃圾收集三件事 哪些記憶體需要回收:死去的對象需要回收 什麼時候回收 如何回收 按照jvm記憶體區域劃分原則:程式計數器、虛擬機棧、本地方法棧3個區域的記憶體隨線程創建而劃分,因此線程結束時,記憶體也自動釋放。 本章節分析的是Java堆和方法區的記憶體管理策略 1、虛擬機棧、本地方法棧,棧中的棧幀隨著方法 ...
  • Python中,要想知道一個字元串有多少個字元(獲得字元串長度),或者一個字元串占用多少個位元組,可以使用len()函數。 語法格式: len(string) string 用於指定要進行長度統計的字元串 示例: a = 'www.baidu.com' print(len(a)) 輸出 13 在 Py ...
  • 二、散點圖 import seaborn as sns import matplotlib.pyplot as plt sns.set_theme(style = 'whitegrid') # 載入 diamonds 數據集 diamonds = sns.load_dataset('diamonds ...
一周排行
    -Advertisement-
    Play Games
  • 一:背景 1.講故事 在分析的眾多dump中,經常會遇到各種奇葩的問題,僅通過dump這種快照形式還是有很多問題搞不定,而通過 perfview 這種粒度又太粗,很難找到問題之所在,真的很頭疼,比如本篇的 短命線程 問題,參考圖如下: 我們在 t2 時刻抓取的dump對查看 短命線程 毫無幫助,我根 ...
  • 在日常後端Api開發中,我們跟前端的溝通中,通常需要協商好入參的數據類型,和參數是通過什麼方式存在於請求中的,是表單(form)、請求體(body)、地址欄參數(query)、還是說通過請求頭(header)。 當協商好後,我們的介面又需要怎麼去接收這些數據呢?很多小伙伴可能上手就是直接寫一個實體, ...
  • 許多情況下我們需要用到攝像頭獲取圖像,進而處理圖像,這篇博文介紹利用pyqt5、OpenCV實現用電腦上連接的攝像頭拍照並保存照片。為了使用和後續開發方便,這裡利用pyqt5設計了個相機界面,後面將介紹如何實現,要點包括界面設計、邏輯實現及完整代碼。 ...
  • 思路分析 註冊頁面需要對用戶提交的數據進行校驗,並且需要對用戶輸入錯誤的地方進行提示! 所有我們需要使用forms組件搭建註冊頁面! 平時我們書寫form是組件的時候是在views.py裡面書寫的, 但是為了接耦合,我們需要將forms組件都單獨寫在一個地方,需要用的時候導入就行! 例如,在項目文件 ...
  • 思路分析 登錄頁面,我們還是採用ajax的方式提交用戶數據 唯一需要學習的是如何製作圖片驗證碼! 具體的登錄頁面效果圖如下: 如何製作圖片驗證碼 推導步驟1:在img標簽的src屬性里放上驗證碼的請求路徑 補充1.img的src屬性: 1.圖片路徑 2.url 3.圖片的二進位數據 補充2:字體樣式 ...
  • 哈嘍,兄弟們! 最近有許多小伙伴都在吐槽打工好難。 每天都是執行許多重覆的任務 例如閱讀新聞、發郵件、查看天氣、打開書簽、清理文件夾等等, 使用自動化腳本,就無需手動一次又一次地完成這些任務, 非常方便啊有木有?! 而在某種程度上,Python 就是自動化的代名詞。 今天就來和大家一起學習一下, 用 ...
  • 作者:IT王小二 博客:https://itwxe.com 前面小二介紹過使用Typora+PicGo+LskyPro打造舒適寫作環境,那時候需要使用水印功能,但是小二在升級LskyPro2.x版本發現有很多不如人意的東西,遂棄用LskyPro使用MinIO結合代碼實現自己需要的圖床功能,也適合以後 ...
  • OpenAI Gym是一款用於研發和比較強化學習演算法的工具包,本文主要介紹Gym模擬環境的功能和工具包的使用方法,並詳細介紹其中的經典控制問題中的倒立擺(CartPole-v0/1)問題。最後針對倒立擺問題如何建立控制模型並採用爬山演算法優化進行了介紹,並給出了相應的完整python代碼示例和解釋。要... ...
  • python爬蟲瀏覽器偽裝 #導入urllib.request模塊 import urllib.request #設置請求頭 headers=("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, l ...
  • 前端代碼搭建 主要利用的是bootstrap3中js插件里的模態框版塊 <li><a href="" data-toggle="modal" data-target=".bs-example-modal-lg">修改密碼</a></li> <div class="modal fade bs-exam ...