Disruptor-源碼解讀

来源:https://www.cnblogs.com/konghuanxi/archive/2023/04/17/17324988.html
-Advertisement-
Play Games

前言 Disruptor的高性能,是多種技術結合以及本身架構的結果。本文主要講源碼,涉及到的相關知識點需要讀者自行去瞭解,以下列出: 鎖和CAS 偽共用和緩存行 volatile和記憶體屏障 原理 此節結合demo來看更容易理解:傳送門 下圖來自官方文檔 官方原圖有點亂,我翻譯一下 在講原理前,先瞭解 ...


前言

Disruptor的高性能,是多種技術結合以及本身架構的結果。本文主要講源碼,涉及到的相關知識點需要讀者自行去瞭解,以下列出:

  • 鎖和CAS
  • 偽共用和緩存行
  • volatile和記憶體屏障

原理

此節結合demo來看更容易理解:傳送門

下圖來自官方文檔

Untitled

官方原圖有點亂,我翻譯一下

Untitled

在講原理前,先瞭解 Disruptor 定義的術語

  • Event

    存放數據的單位,對應 demo 中的 LongEvent

  • Ring Buffer

    環形數據緩衝區:這是一個首尾相接的環,用於存放 Event ,用於生產者往其存入數據和消費者從其拉取數據

  • Sequence

    序列:用於跟蹤進度(生產進度、消費進度)

  • Sequencer

    Disruptor的核心,用於在生產者和消費者之間傳遞數據,有單生產者和多生產者兩種實現。

  • Sequence Barrier

    序列屏障,消費者之間的依賴關係就靠序列屏障實現

  • Wait Strategy

  • 等待策略,消費者等待生產者將發佈的策略

  • Event Processor

    事件處理器,迴圈從 RingBuffer 獲取 Event 並執行 EventHandler。

  • Event Handler

    事件處理程式,也就是消費者

  • Producer

    生產者

Ring Buffer

環形數據緩衝區(RingBuffer),邏輯上是首尾相接的環,在代碼中用數組來表示Object[]。Disruptor生產者發佈分兩步

  • 步驟一:申請寫入 n 個元素,如果可以寫入,這返回最大序列號
  • 步驟二:根據序列號去 RingBuffer 中獲取 Event,修改併發布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 獲取下一個可用位置的下標(步驟1)
long sequence = ringBuffer.next();
try {
    // 返回可用位置的元素
    LongEvent event = ringBuffer.get(sequence);
    // 設置該位置元素的值
    event.set(l);
} finally {
    // 發佈
    ringBuffer.publish(sequence);
}

這兩個步驟由 Sequencer 完成,分為單生產者和多生產者實現

Sequencer

單生產者

如果申請 2 個元素,則如下圖所示(圓表示 RingBuffer)

// 一般不會有以下寫法,這裡為了講解源碼才使用next(2)
// 向RingBuffer申請兩個元素
long sequence = ringBuffer.next(2);
for (long i =  sequence-1; i <= sequence; i++) {
    try {
        // 返回可用位置的元素
        LongEvent event = ringBuffer.get(i);
        // 設置該位置元素的值
        event.set(1);
    } finally {
        ringBuffer.publish(i);
    }
}

Untitled

next 申請成功的序列,cursor 消費者最大可用序列,gatingSequence 表示能申請的最大序列號。紅色表示待發佈,綠色表示已發佈。申請相當於占位置,發佈需要一個一個按順序發佈

如果 RingBuffer 滿了呢,在上圖步驟二的基礎上,生產者發佈了3個元素,消費者消費1個。此時生產者再申請 2個元素,就會變成下圖所示

Untitled

只剩下 1 個空間,但是要申請 2個元素,此時程式會自旋等待空間足夠。

接下來結合代碼看,單生產者的 Sequencer 實現為 SingleProducerSequencer,先看看構造方法

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    long nextValue = Sequence.INITIAL_VALUE;
    long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

這是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的類變數只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充緩存行,這能保證 nextValue 和cachedValue 必定在獨立的緩存行,我們可以用ClassLayout列印記憶體佈局看看

Untitled

接下來看如何獲取序列號(也就是步驟一)

// 調用路徑
// RingBuffer#next()
// SingleProducerSequencer#next()
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;

    //生產者當前序號值+期望獲取的序號數量後達到的序號值
    long nextSequence = nextValue + n;
    //減掉RingBuffer的總的buffer值,用於判斷是否出現‘覆蓋’
    long wrapPoint = nextSequence - bufferSize;
    //從後面代碼分析可得:cachedValue就是緩存的消費者中最小序號值,他不是當前最新的‘消費者中最小序號值’,而是上次程式進入到下麵的if判定代碼段時,被賦值的當時的‘消費者中最小序號值’
    //這樣做的好處在於:在判定是否出現覆蓋的時候,不用每次都調用getMininumSequence計算‘消費者中的最小序號值’,從而節約開銷。只要確保當生產者的節奏大於了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可。
    long cachedGatingSequence = this.cachedValue;

    //(wrapPoint > cachedGatingSequence) : 當生產者已經超過上一次緩存的‘消費者中最小序號值’(cachedGatingSequence)一個‘Ring’大小(bufferSize),需要重新獲取cachedGatingSequence,避免當生產者一直在生產,但是消費者不再消費的情況下,出現‘覆蓋’
    //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
    // 這裡判斷就是生產者生產的填滿BingBUffer,需要等待消費者消費
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        cursor.setVolatile(nextValue);  // StoreLoad fence

        //gatingSequences就是消費者隊列末尾的序列,也就是消費者消費到哪裡了
        //實際上就是獲得處理的隊尾,如果隊尾是current的話,說明所有的消費者都執行完成任務在等待新的事件了
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            // 等待1納秒
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }

        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

public void publish(long sequence)
{
    // 更新序列號
    cursor.set(sequence);
    // 等待策略的喚醒
    waitStrategy.signalAllWhenBlocking();
}

要解釋的都在註釋里了,gatingSequences 是消費者隊列末尾的序列,對應著就是下圖中的 ApplicationConsumer 的 Sequence

Untitled

多生產者

看完單生產者版,接下來看多生產者的實現。因為是多生產者,需要考慮併發的情況。

如果有A、B兩個消費者都來申請 2 個元素

Untitled

cursor 申請成功的序列,HPS 消費者最大可用序列,gatingSequence 表示能申請的最大序列號。紅色表示待發佈,綠色表示已發佈。HPS 是我自己編的縮寫,表示 getHighestPublishedSequence 方法的返回值

如圖所示,只要申請成功,就移動 cursor 的位置。RingBuffer 並沒有記錄發佈情況(圖中的紅綠顏色),這個發佈情況由 MultiProducerSequenceravailableBuffer 來維護。

下麵看代碼

public final class MultiProducerSequencer extends AbstractSequencer
{
    // 緩存的消費者中最小序號值,相當於SingleProducerSequencerFields的cachedValue
    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 標記元素是否可用
    private final int[] availableBuffer;

    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            //減掉RingBuffer的總的buffer值,用於判斷是否出現‘覆蓋’
            long wrapPoint = next - bufferSize;
            //從後面代碼分析可得:cachedValue就是緩存的消費者中最小序號值,他不是當前最新的‘消費者中最小序號值’,而是上次程式進入到下麵的if判定代碼段時,被賦值的當時的‘消費者中最小序號值’
            //這樣做的好處在於:在判定是否出現覆蓋的時候,不用每次都調用getMininumSequence計算‘消費者中的最小序號值’,從而節約開銷。只要確保當生產者的節奏大於了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可。
            long cachedGatingSequence = gatingSequenceCache.get();

            //(wrapPoint > cachedGatingSequence) : 當生產者已經超過上一次緩存的‘消費者中最小序號值’(cachedGatingSequence)一個‘Ring’大小(bufferSize),需要重新獲取cachedGatingSequence,避免當生產者一直在生產,但是消費者不再消費的情況下,出現‘覆蓋’
            //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
            // 這裡判斷就是生產者生產的填滿BingBUffer,需要等待消費者消費
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            // 使用cas保證只有一個生產者能拿到next
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }
......
}

MultiProducerSequencerSingleProducerSequencer的 next()方法邏輯大致一樣,只是多了CAS的步驟來保證併發的正確性。接著看發佈方法

public void publish(final long sequence)
{
    // 記錄發佈情況
    setAvailable(sequence);
    // 等待策略的喚醒
    waitStrategy.signalAllWhenBlocking();
}

private void setAvailable(final long sequence)
{
    // calculateIndex(sequence):獲取序號
    // calculateAvailabilityFlag(sequence):RingBuffer的圈數
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    // 上面相當於 availableBuffer[index] = flag 的高性能版
}

記錄發佈情況,其實相當於 availableBuffer[sequence] = 圈數,前面說了,availableBuffer是用來標記元素是否可用的,如果消費者的圈數 ≠ availableBuffer中的圈數,則表示元素不可用

public boolean isAvailable(long sequence)
{
    int index = calculateIndex(sequence);
    // 計算圈數
    int flag = calculateAvailabilityFlag(sequence);
    long bufferAddress = (index * SCALE) + BASE;
    // UNSAFE.getIntVolatile(availableBuffer, bufferAddress):相當於availableBuffer[sequence] 的高性能版
    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}

private int calculateAvailabilityFlag(final long sequence)
{
    // 相當於 sequence % bufferSize ,但是位操作更快
    return (int) (sequence >>> indexShift);
}

isAvailable() 方法判斷元素是否可用,此方法的調用堆棧看完消費者就清楚了。

消費者

本小節介紹兩個方面,一是 Disruptor 的消費者如何實現依賴關係的,二是消費者如何拉取消息並消費

消費者的依賴關係實現

Untitled

我們看回這張圖,每個消費者前都有一個 SequenceBarrier ,這就是消費者之間能實現依賴的關鍵。每個消費者都有一個 Sequence,表示自身消費的進度,如圖中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer 和 JournalConsumer 的 Sequence,這樣就能控制 ApplicationConsumer 的消費進度不超過其依賴的消費者。

下麵看源碼,這是 disruptor 配置消費者的代碼。

EventHandler journalConsumer = xxx;
EventHandler replicaionConsumer = xxx;
EventHandler applicationConsumer = xxx;

disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
        .then(applicationConsumer);

// 下麵兩行等同於上面這行
// disruptor.handleEventsWith(journalConsumer, replicaionConsumer);
// disruptor.after(journalConsumer, replicaionConsumer).then(applicationConsumer);

先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer)

/** 代碼都在Disruptor類 **/

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    // 沒有依賴的消費者就創建新的Sequence
    return createEventProcessors(new Sequence[0], handlers);
}

/**
 * 創建消費者
 * @param barrierSequences 當前消費者組的屏障序列數組,如果當前消費者組是第一組,則取一個空的序列數組;否則,barrierSequences就是上一組消費者組的序列數組
 * @param eventHandlers 事件消費邏輯的EventHandler數組
 */
EventHandlerGroup<T> createEventProcessors(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers)
{
    checkNotStarted();

    // 對應此事件處理器組的序列組
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        // 創建消費者,註意這裡傳入了SequenceBarrier
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    // 每次添加完事件處理器後,更新門控序列,以便後續調用鏈的添加
    // 所謂門控,就是RingBuffer要知道在消費鏈末尾的那組消費者(也是最慢的)的進度,避免消息未消費就被寫入覆蓋
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

createEventProcessors() 方法主要做了3件事,創建消費者、保存eventHandler和消費者的映射關係、更新 gatingSequences

  • EventProcessor 是消費者
  • SequenceBarrier 是消費者屏障,保證了消費者的依賴關係
  • consumerRepository 保存了eventHandler和消費者的映射關係

gatingSequences 我們在前面說過,生產者通過 gatingSequences 知道消費者的進度,防止生產過快導致消息被覆蓋,更新操作在 updateGatingSequencesForNextInChain() 方法中

// 為消費鏈下一組消費者,更新門控序列
// barrierSequences是上一組事件處理器組的序列(如果本次是第一次,則為空數組),本組不能超過上組序列值
// processorSequences是本次要設置的事件處理器組的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        // 將本組序列添加到Sequencer中的gatingSequences中
        ringBuffer.addGatingSequences(processorSequences);
        // 將上組消費者的序列從gatingSequences中移除
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        // 取消標記上一組消費者為消費鏈末端
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

讓我們把視線再回到消費者的設置方法

disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
        .then(applicationConsumer);

journalConsumer 和 replicaionConsumer 已經設置了,接下來是 applicationConsumer

/** 代碼在EventHandlerGroup類 **/

public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
    return handleEventsWith(handlers);
}

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return disruptor.createEventProcessors(sequences, handlers);
}

可以看到,設置 applicationConsumer 最終調用的也是 createEventProcessors() 方法,區別就在於 createEventProcessors() 方法的第一個參數,這裡的 sequences 就是 journalConsumer 和 replicaionConsumer 這兩個消費者的 Sequence

消費者的消費邏輯

消費者的主要消費邏輯在 EventProcessor#run()方法中,下麵以BatchEventProcessor舉例

// BatchEventProcessor#run()
// BatchEventProcessor#processEvents()
private void processEvents()
{
    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true)
    {
        try
        {
            // 獲取最大可用序列
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            ...

            // 執行消費邏輯
            while (nextSequence <= availableSequence)
            {
                // dataProvider就是RingBuffer
                event = dataProvider.get(nextSequence);
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }

            sequence.set(availableSequence);
        }
        catch ()
        {
            // 異常處理
        }
    }
}

方法簡潔明瞭,在死迴圈中通過 sequenceBarrier 獲取最大可用序列,然後從 RingBuffer 中獲取 Event 並調用 EventHandler 進行消費。重點在 sequenceBarrier.waitFor(nextSequence); 中

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
{
    checkAlert();
    // 獲取可用的序列,這裡返回的是Sequencer#next方法設置成功的可用下標,不是Sequencer#publish
    // cursorSequence:生產者的最大可用序列
    // dependentSequence:依賴的消費者的最大可用序列
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

    if (availableSequence < sequence)
    {
        return availableSequence;
    }
    // 獲取最大的已發佈成功的序號(對於發佈是否成功的校驗在此方法中)
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

熟悉的 getHighestPublishedSequence() 方法,忘了就回去看看生產者小節。waitStrategy.waitFor() 對應著圖片中的 waitFor() 。

消費者的啟動

前面講了消費者的處理邏輯,但是 BatchEventProcessor#run() 是如何被調用的呢,關鍵在於disruptor.start();

// Disruptor#start()
public RingBuffer<T> start()
{
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository)
    {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

class EventProcessorInfo<T> implements ConsumerInfo
{
    public void start(final Executor executor)
    {
        // eventprocessor就是消費者
        executor.execute(eventprocessor);
    }
}

還記得 consumerRepository嗎,沒有就往上翻翻設置消費者那裡的 disruptor.handleEventsWith() 方法。

所以啟動過程就是
disruptor#start() → ConsumerInfo#start() → Executor#execute() → EventProcessor#run()

課後作業:Disruptor 的消費者使用了多少線程?

總結

本文講了 Disruptor 大體邏輯和源碼,當然其高性能的秘訣不止文中描述的那些。還有不同的等待策略,Sequence 中使用Unsafe而不是JDK中的 Atomic 原子類等等。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 源代碼下載鏈接: 一、賓館客房管理系統開發初衷 隨著互聯網技術的迅速發展,電腦技術的普及以及信息化時代的推波助瀾,賓館客房需求的逐漸增大,這也是挑戰了賓館客房管理方面的技術,以前的人工管理方式已經不再適應現在的環境,取而代之的是先進的賓館客房管理系統,提高了賓館的工作效率,為想要入住賓館的人提供更 ...
  • 代碼如下: import com.google.zxing.BarcodeFormat; import com.google.zxing.EncodeHintType; import com.google.zxing.MultiFormatWriter; import com.google.zxin ...
  • 併發工具類 通常我們所說的併發包也就是java.util.concurrent(JUC),集中了Java併發的各種工具類, 合理地使用它們能幫忙我們快速地完成功能 。 作者: 博學谷狂野架構師 GitHub:GitHub地址 (有我精心準備的130本電子書PDF) 只分享乾貨、不吹水,讓我們一起加油 ...
  • 好消息:與上題的Emergency是同樣的方法。壞消息:又錯了&&c++真的比c方便太多太多。 A family hierarchy is usually presented by a pedigree tree. Your job is to count those family members ...
  • 安裝Zookeeper和Kafka集群 本文介紹如何安裝Zookeeper和Kafka集群。為了方便,介紹的是在一臺伺服器上的安裝,實際應該安裝在多台伺服器上,但步驟是一樣的。 安裝Zookeeper集群 下載安裝包 從官網上下載安裝包: curl https://dlcdn.apache.org/ ...
  • 原文鏈接:https://www.zhoubotong.site/post/94.html 說下背景吧,大家在開發中可能在不同的目錄(package)下定義了相同的struct(屬性參數完全一樣如名字、個數和類型),在方法調用傳參數的時候,可能是用到了其中某一個struct的引用。 那麼這裡就牽扯到 ...
  • Java的反射機制允許程式員在執行期藉助於Reflection API取得任何類的內部信息,並能操作對象的屬性和方法,在各類框架中應用非常廣泛。這一期是關於反射內容的筆記,包含Class類、Field類、Method類、Constructor類及相關方法。 ...
  • Gin 環境:https://goproxy.cn,driect github.com/gin-gonic/gin 介紹 Gin 是一個用 Go (Golang) 編寫的 Web 框架。 它具有類似 martini 的 API,性能要好得多,多虧了 httprouter,速度提高了 40 倍。 如果 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...