大數據Hadoop之——Flink的狀態管理和容錯機制(checkpoint)

来源:https://www.cnblogs.com/liugp/archive/2022/05/11/16260166.html
-Advertisement-
Play Games

一、Flink中的狀態 官方文檔 有狀態的計算是流處理框架要實現的重要功能,因為稍複雜的流處理場景都需要記錄狀態,然後在新流入數據的基礎上不斷更新狀態。下麵的幾個場景都需要使用流處理的狀態功能: 數據流中的數據有重覆,想對重覆數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數 ...


目錄

一、Flink中的狀態

官方文檔

有狀態的計算是流處理框架要實現的重要功能,因為稍複雜的流處理場景都需要記錄狀態,然後在新流入數據的基礎上不斷更新狀態。下麵的幾個場景都需要使用流處理的狀態功能:

  • 數據流中的數據有重覆,想對重覆數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
  • 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度感測器數據流中的溫度是否在持續上升。
  • 對一個時間視窗內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。

個狀態更新和獲取的流程如下圖所示,一個運算元子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。

1)鍵控狀態(Keyed State)

keyed state 介面提供不同類型狀態的訪問介面,這些狀態都作用於當前輸入數據的 key 下。換句話說,這些狀態僅可在 KeyedStream 上使用,在Java/Scala API上可以通過 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通過 stream.key_by(...) 得到 KeyedStream。

1、控制項狀態特點

  • 鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的
  • Flink 為每個 key 維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個運算元任務中,這個任務會維護和處理這個 key 對應的狀態
  • 當任務處理一條數據時,它會自動將狀態的訪問範圍限定為當前數據的 key

2、鍵控狀態類型

鍵控狀態類型 說明 方法
ValueState[T] 值狀態,保存一個可以更新和檢索的值 ValueState.update(value: T)
ValueState.value()
ListState[T] 列表狀態,保存一個元素的列表可以往這個列表中追加數據,併在當前的列表上進行檢索。 ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.update(values: java.util.List[T])
ListState.get()(註意:返回的是Iterable[T]
ReducingState 聚合狀態,保存一個單值,表示添加到狀態的所有值的聚合,介面與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。 ReducingState.add(value: T)
ReducingState.get()
AggregatingState<IN, OUT> 聚合狀態,保留一個單值,表示添加到狀態的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與 添加到狀態的元素的類型不同。 介面與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合。 AggregatingState.add(value: T)
AggregatingState.get()
MapState<UK, UV> 映射狀態,維護了一個映射列表,保存Key-Value對。 MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)

【溫馨提示】所有類型的狀態還有一個clear() 方法,清除當前 key 下的狀態數據,也就是當前輸入元素的 key。

3、狀態有效期 (TTL)

任何類型的 keyed state 都可以有 有效期 (TTL)。所有狀態類型都支持單元素的 TTL。 這意味著列表元素和映射元素將獨立到期

【官網示例】

package com
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

object StateTest001 {
  def main(args: Array[String]): Unit = {
    val ttlConfig = StateTtlConfig
      .newBuilder(Time.seconds(1))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      .build

    val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
    stateDescriptor.enableTimeToLive(ttlConfig)
    
  }
}

TTL 配置有以下幾個選項:

  • newBuilder 的第一個參數表示數據的有效期,是【必選項】。
  • TTL 的更新策略(預設是 OnCreateAndWrite)
    1. StateTtlConfig.UpdateType.OnCreateAndWrite - 僅在創建和寫入時更新
    2. StateTtlConfig.UpdateType.OnReadAndWrite - 讀取時也更新
1)過期數據的清理

預設情況下,過期數據會在讀取的時候被刪除,例如 ValueState#value,同時會有後臺線程定期清理(如果 StateBackend 支持的話)。可以通過 StateTtlConfig 配置關閉後臺清理:

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground
    .build
2)全量快照時進行清理

可以啟用全量快照時進行清理的策略,這可以減少整個快照的大小。當前實現中不會清理本地的狀態,但從上次快照恢復時,不會恢復那些已經刪除的過期數據。 該策略可以通過 StateTtlConfig 配置進行配置:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

【溫馨提示】這種策略在 RocksDBStateBackend 的增量 checkpoint 模式下無效。

3)增量數據清理

另外可以選擇增量式清理狀態數據,在狀態訪問或/和處理時進行。如果某個狀態開啟了該清理策略,則會在存儲後端保留一個所有狀態的惰性全局迭代器。 每次觸發增量清理時從迭代器中選擇已經過期的數進行清理

4)在 RocksDB 壓縮時清理

如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定製的壓縮過濾器。RocksDB 會周期性的對數據進行合併壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經過期的狀態數據。該特性可以通過 StateTtlConfig 進行配置:

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build

【註意】

  • 如果沒有 state 訪問,也沒有處理數據,則不會清理過期數據。
  • 增量清理會增加數據處理的耗時。
  • 現在僅 Heap state backend 支持增量清除機制。在 RocksDB state backend 上啟用該特性無效。
  • 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止併發修改問題,因此會增加記憶體的使用。但非同步快照則沒有這個問題。
  • 對已有的作業,這個清理方式可以在任何時候通過 StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟後。

4、鍵控狀態的使用

除了上面描述的介面之外,Scala API 還在 KeyedStream 上對 map() 和 flatMap() 訪問 ValueState 提供了一個更便捷的介面mapWithState。 用戶函數能夠通過 Option 獲取當前 ValueState 的值,並且返回即將保存到狀態的值。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2)運算元狀態(Operatior State)

運算元狀態(或者非 keyed 狀態)是綁定到一個並行運算元實例的狀態。Kafka Connector 是 Flink 中使用運算元狀態一個很具有啟發性的例子。Kafka consumer 每個並行實例維護了 topic partitions 和偏移量的 map 作為它的運算元狀態。

【溫馨提示】 Python DataStream API 仍無法支持運算元狀態。

1、運算元狀態特點

  • 運算元狀態的作用範圍限定為運算元任務,由同一併行任務所處理的所有數據都可以訪問到相同的狀態
  • 狀態對於同一子任務而言是共用
  • 運算元狀態不能由相同或不同運算元的另一個子任務訪問

2、運算元狀態類型

鍵控狀態類型 說明
列表狀態(ListState) 將狀態表示為一組數據的列表
聯合列表狀態(UnionListState) 也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程式時如何恢復
廣播狀態(BroadcastState) 如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態。

3)廣播狀態 (Broadcast State)

廣播狀態是一種特殊的運算元狀態。引入它的目的在於支持一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態用於保持所有子任務狀態相同。 該狀態接下來可在第二個處理記錄的數據流中訪問。廣播狀態和其他運算元狀態的不同之處在於:

  • 它具有 map 格式,
  • 它僅在一些特殊的運算元中可用。這些運算元的輸入為一個廣播數據流和非廣播數據流,
  • 這類運算元可以擁有不同命名的多個廣播狀態 。

【溫馨提示】 Python DataStream API 仍無法支持運算元狀態。

二、狀態後端(State Backends)

狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就
叫做狀態後端(state backend) ,狀態後端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲

1)三種狀態存儲方式

存儲方式 說明
MemoryStateBackend 【預設模式】狀將鍵控狀態作為記憶體中的對象進行管理,將它們存儲在TaskManager的JVM堆上,將checkpoint存儲在JobManager的記憶體中。主要適用於本地開發和調試
FsStateBackend 基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分散式文件系統。 需要註意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的記憶體中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。
RocksDBStateBackend 將所有狀態序列化後,存入本地的RocksDB中存儲。

【溫馨提示】特別在 MemoryStateBackend 內使用HeapKeyedStateBackend時,Checkpoint 序列化數據階段預設有最大 5 MB數據的限制。

對於HeapKeyedStateBackend,有兩種實現:

  • 支持非同步 Checkpoint(預設):存儲格式 CopyOnWriteStateMap
  • 僅支持同步 Checkpoint:存儲格式 NestedStateMap

2)配置方式

Flink 支持使用兩種方式來配置後端管理器:

1、【第一種方式】基於代碼方式進行配置

【溫馨提示】只對當前作業生效

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints"));

配置 RocksDBStateBackend 時,需要額外導入下麵的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

【溫馨提示】對所有部署在該集群上的作業都生效

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints

三、容錯機制(checkpoint)

checkpoint是Flink容錯的核心機制。它可以定期地將各個Operator處理的數據進行快照存儲( Snapshot )。如果Flink程式出現宕機,可以重新從這些快照中恢複數據。

1)一致性

談到容錯性,就沒法避免一致性這個概念。所謂一致性就是:成功處理故障並恢復之後得到的結果與沒有發生任何故障是得到的結果相比,前者的正確性。換句大白話,就是故障的發生是否影響得到的結果。在流處理過程,一致性分為3個級別

  • at-most-once:至多一次。故障發生之後,計算結果可能丟失,就是無法保證結果的正確性;
  • at-least-once:至少一次。計算結果可能大於正確值,但絕不會小於正確值,就是計算程式發生故障後可能多算,但是絕不可能少算;
  • exactly-once:精確一次。系統保證發生故障後得到的計算結果的值和正確值一致;

Flink的容錯機制保證了exactly-once,也可以選擇at-least-once。Flink的容錯機制是通過對數據流不停的做快照(snapshot)實現的。

2)檢查點(checkpoint)

Flink 中的每個方法或運算元都能夠是有狀態的,為了讓狀態容錯,Flink 需要為狀態添加 checkpoint(檢查點)。Checkpoint 使得 Flink 能夠恢復狀態和在流中的位置,從而嚮應用提供和無故障執行時一樣的語義。官方文檔

1、開啟與配置 Checkpoint

預設情況下 checkpoint 是禁用的。通過調用 StreamExecutionEnvironmentenableCheckpointing(n) 來啟用 checkpoint,裡面的 n 是進行 checkpoint 的間隔,單位毫秒

2、Checkpoint 屬性

屬性 說明
精確一次(exactly-once) 你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇保證等級級別。
checkpoint 超時 如果 checkpoint 執行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄
checkpoints 之間的最小時間 該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設置為了 5000, 無論 checkpoint 持續時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒後會才開始下一個 checkpoint。
checkpoint 可容忍連續失敗次數 該屬性定義可容忍多少次連續的 checkpoint 失敗。超過這個閾值之後會觸發作業錯誤 fail over。 預設次數為“0”,這意味著不容忍 checkpoint 失敗,作業將在第一次 checkpoint 失敗時fail over。
併發 checkpoint 的數目 預設情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統不會觸發另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。 不過允許多個 checkpoint 並行進行是可行的,對於有確定的處理延遲(例如某方法所調用比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障後重跑的 pipelines 來說,是有意義的。
externalized checkpoints 你可以配置周期存儲 checkpoint 到外部系統中。Externalized checkpoints 將他們的元數據寫到持久化存儲上並且在 job 失敗的時候不會被自動刪除。 這種方式下,如果你的 job 失敗,你將會有一個現有的 checkpoint 去恢復。更多的細節請看 Externalized checkpoints 的部署文檔

【官網示例】

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000)

// 高級選項:

// 設置模式為精確一次 (這是預設值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// Checkpoint 必須在一分鐘內完成,否則就會被拋棄
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 允許兩個連續的 checkpoint 錯誤
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)

// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 使用 externalized checkpoints,這樣 checkpoint 在作業取消後仍就會被保留
env.getCheckpointConfig().enableExternalizedCheckpoints(
  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

// 開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()

3)從檢查點恢復狀態

  • 【第一步】遇到故障之後,第一步就是重啟應用
  • 【第二步】是從 checkpoint 中讀取狀態,將狀態重置,從檢查點重新啟動應用程式後,其內部狀態與檢查點完成時的狀態完全相同
  • 【第三步】開始消費並處理檢查點到發生故障之間的所有數據,這種檢查點的保存和恢復機制可以為應用程式狀態提供“精確一次”(exactly- once)的一致性,因為所有運算元都會保存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置

4)檢查點的實現演算法

  • 【一種簡單的想法】:暫停應用,保存狀態到檢查點,再重新恢復應用
  • 【Flink 的改進實現】:
    1. 基於 Chandy-Lamport 演算法的分散式快照
    2. 將檢查點的保存和數據處理分離開,不暫停整個應用

5)檢查點演算法

基於Chandy-Lamport演算法實現的分散式快照

1、檢查點分界線(Checkpoint Barrier)

  • 將barrier插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier不會幹擾正常數據,數據流嚴格有序。

  • 一個barrier把數據流分割成兩部分:一部分進入到當前快照,另一部分進入到下一個快照。

  • 每一個barrier都帶有快照ID,並且barrier之前的數據都進入了此快照。Barrier不會幹擾數據流處理,所以非常輕量。

  • 多個不同快照的多個barrier會在流中同時出現,即多個快照可能同時創建。

2、Barrier對齊

當一個opeator有多個輸入流的時候,checkpoint barrier n 會進行對齊,就是已到達的會先緩存到buffer里等待其他未到達的,一旦所有流都到達,則會向下游廣播,exactly-once 就是利用這一特性實現的,at least once 因為不會進行對齊,就會導致有的數據被重覆處理。

3、執行一次檢查點步驟

  1. jobManager會向每個source任務發送一條帶有新檢查點ID的消息,通過這種方式來啟動檢查點。
  2. 數據源將他們各自的狀態寫入檢查點後,並向下游所有分區發出一個檢查點barrier。狀態後端在狀態存入檢查點之後,會返回通知給source任務,source任務再向jobmanager確認檢查點完成。
  3. barrier向下游傳遞,下游任務會等待所有輸入分區的barrier的到達後再做狀態保存通知jobmanager狀態保存完成,並再向下游所有分區發送收到的檢查點barrier。

【溫馨提示】對於barrier已經到達的分區,繼續到達的數據會被緩存;對於barrier未到達的分區,數據會被正常處理所有barrier都到達後,做完狀態保存且向下游發送檢查點barrier後,當前任務繼續處理緩存的數據和後面到來的數據。

  1. sink任務向jobmanager確認狀態保存到checkpoint完成。即所有任務都確認已成功將狀態保存到檢查點時,檢查點就真正完成了。

6)保存點(savepoint)

1、概述

  • Flink 還提供了可以自定義的鏡像保存功能,就是保存點(savepoints)
  • 原則上,創建保存點使用的演算法與檢查點完全相同,因此保存點可以認為就是具有一些額外元數據的檢查點;
  • Flink不會自動創建保存點,因此用戶(或者外部調度程式)必須明確地觸發創建操作;
  • 保存點是一個強大的功能。除了故障恢復外,保存點可以用於:有計劃的手動備份,更新應用程式,版本遷移,暫停和重啟應用,等等。

2、savepoint觸發的三種方式

  1. 使用 flink savepoint 命令觸發 Savepoint,其是在程式運行期間觸發 savepoint。

  2. 使用 flink cancel -s 命令,取消作業時,並觸發 Savepoint。

  3. 使用 Rest API 觸發 Savepoint,格式為:/jobs/:jobid /savepoints

7)檢查點(checkpoint)與 保存點(savepoint)的區別與聯繫

  • checkpoint的側重點是“容錯”,即Flink作業意外失敗並重啟之後,能夠直接從早先打下的checkpoint恢復運行,且不影響作業邏輯的準確性。而savepoint的側重點是“維護”,即Flink作業需要在人工干預下手動重啟、升級、遷移或A/B測試時,先將狀態整體寫入可靠存儲,維護完畢之後再從savepoint恢復現場。
  • savepoint是“通過checkpoint機制”創建的,所以savepoint本質上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各個TaskManager定時觸發快照並自動清理,一般不需要用戶干預savepoint面向用戶完全根據用戶的需要觸發與清理
  • checkpoint的頻率往往比較高(因為需要儘可能保證作業恢復的準確度),所以checkpoint的存儲格式非常輕量級,但作為trade-off犧牲了一切可移植(portable)的東西,比如不保證改變並行度和升級的相容性。savepoint則以二進位形式存儲所有狀態數據和元數據,執行起來比較慢而且“貴”,但是能夠保證portability,如並行度改變或代碼升級之後,仍然能正常恢復。

未完待續,請耐心等待~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 垃圾回收(GC)是托管語言必備的技術之一。GC 的性能是影響托管語言性能的關鍵。我們的 .NET 既能寫桌面程式 (WINFROM , WPF) 又能寫 web 程式 (ASP.NET CORE),甚至還能寫移動端程式。。。不同使用場景的程式對 GC 的風格也有不同的要求,比如桌面程式更註重界面的響 ...
  • rpm資源包下載 在一些內網或區域網環境中,無法通過 yum install xxx 進行程式包的下載安裝。 需要從具有外網環境的電腦上下載離線程式包,拷貝至內網環境中手動安裝。 方法一:使用 yum 下載 yum --downloadonly --downloaddir=/home/package ...
  • 1.第一個shell vi first.sh !/bin/bash 作者:Arya 編寫時間:2022-04-22 功能:this is my first blog! echo "this is my first shell!" 2.crond服務 以守護進程方式在無需人工干預的情況下來處理著一系列 ...
  • https://www.cnblogs.com/yeungchie/ XFCE是一款輕量級 Linux 桌面,當前版本已經將所有部件從 GTK2 更新到 GTK3,從D-Dbus Glib更新到GDBus,大部分組件支持Object Introspection(簡稱 GI,用於產生與解析 C 程式庫 ...
  • 為什麼要使用Docusaurus Docusaurus 是 Facebook 專門為開源項目開發者提供的一款易於維護的靜態網站創建工具,使用 Markdown 即可更新網站。構建一個帶有主頁、文檔、API、幫助以及博客頁面的靜態網站,只需5分鐘。 Docusaurus 是一個靜態站點生成器。它構建了 ...
  • cat命令詳解 用法 功能 cat filename 獲取文件內容 cat file1 file2 > newfile 將file2的內容追加到file1,生成新文件newfile,但不會刪除原文件 cat > file 創建並編輯file,若file存在,則原文件內容被覆蓋, 按ctrl c 或者 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 第一次在Linux雲伺服器上部署前後端分離項目,查了很多資料和視頻,踩了許多坑。成功實現部署若依的前後端分離項目後,想記錄一下前後端部署的過程,供學習的小伙伴參考。 1.環境準備 一定要在開始前先準備好以下工具和環境(可以上網查找安裝的方法),後 ...
  • 引言 我們在定時任務中經常能接觸到cron表達式,但是在寫cron表達式的時候我們會遇到各種各樣版本的cron表達式,比如我遇到過5位、6位甚至7位的cron表達式,導致我一度搞混這些表達式。更嚴重的是,當我們沒有準確寫出cron表達式時,會出現定時任務一直沒有執行,或者定時任務執行太頻繁的糟糕情況 ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...