Apache Flink是一個在無界和有界數據流上進行有狀態計算的框架。Flink提供了不同抽象級別的多個API,併為常見用例提供了專用庫。 在這裡,我們介紹Flink易於使用且富有表現力的API和庫。 流媒體應用的構建塊 流處理框架可以構建和執行的應用程式類型取決於該框架對流、狀態和時間的控製程度 ...
Apache Flink是一個在無界和有界數據流上進行有狀態計算的框架。Flink提供了不同抽象級別的多個API,併為常見用例提供了專用庫。
在這裡,我們介紹Flink易於使用且富有表現力的API和庫。
流媒體應用的構建塊
流處理框架可以構建和執行的應用程式類型取決於該框架對流、狀態和時間的控製程度。在下麵,我們將描述流處理應用程式的這些構建塊,並解釋Flink處理它們的方法。
Streams
顯然,流是流處理的一個基本方面。然而,流可以具有不同的特性,這些特性會影響流的處理方式。Flink是一個通用的處理框架,可以處理任何類型的流。
- 有界和無界流:流可以是無界或有界的,即固定大小的數據集。Flink具有處理無限流的複雜功能,但也有專門的操作員來高效處理有界流。
- 實時和記錄流:所有數據都以流的形式生成。有兩種方法來處理數據。在生成流時實時處理它,或將流持久化到存儲系統(例如,文件系統或對象存儲),然後進行處理。Flink應用程式可以處理記錄或實時流。
state
每個非平凡的流應用程式都是有狀態的,也就是說,只有對單個事件應用轉換的應用程式不需要狀態。任何運行基本業務邏輯的應用程式都需要記住事件或中間結果,以便在稍後的時間點訪問它們,例如,當接收到下一個事件時或在特定的持續時間之後。
應用狀態是Flink中最重要的一個特征。通過查看Flink在狀態處理上下文中提供的所有特性,可以看出這一點。
- Flink為不同的數據結構(如原子值、列表或映射)提供狀態原語。開發人員可以根據函數的訪問模式選擇最有效的狀態原語。
- 可插拔狀態後端:應用程式狀態由可插拔狀態後端管理和檢查。Flink具有不同的狀態後端,將狀態存儲在記憶體或RocksDB中,RocksDB是一種高效的嵌入式磁碟數據存儲。自定義狀態後端也可以插入。
- 精確一次狀態一致性:Flink的檢查點和恢復演算法保證了在出現故障時應用程式狀態的一致性。因此,故障的處理是透明的,不會影響應用程式的正確性。
- 超大狀態:由於其非同步和增量檢查點演算法,Flink能夠保持數TB大小的應用程式狀態。
- 可擴展應用程式:Flink通過將狀態重新分配給更多或更少的工作人員,支持有狀態應用程式的擴展。
時間
時間是流媒體應用程式的另一個重要組成部分。大多數事件流都有內在的時間語義,因為每個事件都是在特定的時間點生成的。此外,許多常見的流計算都是基於時間的,例如windows聚合、會話、模式檢測和基於時間的連接。流處理的一個重要方面是應用程式如何測量時間,即事件時間和處理時間的差異。
Flink提供了一系列豐富的與時間相關的功能。
事件時間模式:使用事件時間語義處理流的應用程式根據事件的時間戳計算結果。因此,無論是處理記錄的還是實時的事件,事件時間處理都允許獲得準確且一致的結果。
水印支持:Flink在事件時間應用程式中使用水印來推理時間。水印也是一種靈活的機制,可以權衡結果的延遲和完整性。
延遲數據處理:在使用水印以事件時間模式處理流時,可能會發生在所有相關事件到達之前計算已經完成的情況。這種事件稱為遲發事件。Flink提供了多個選項來處理延遲事件,例如通過側輸出重新路由事件,以及更新之前完成的結果。
處理時間模式:除了事件時間模式外,Flink還支持處理時間語義,該語義執行由處理器的掛鐘時間觸發的計算。處理時間模式可以適用於某些具有嚴格低延遲要求的應用程式,這些應用程式可以容忍近似的結果。
分層 API
Flink 根據抽象程度分層,提供了三種不同的 API。每一種 API 在簡潔性和表達力上有著不同的側重,並且針對不同的應用場景。
下文中,我們將簡要描述每一種 API 及其應用,並提供相關的代碼示例。
/**
* 將相鄰的 keyed START 和 END 事件相匹配並計算兩者的時間間隔 * 輸入數據為 Tuple2<String, String> 類型,第一個欄位為 key 值, * 第二個欄位標記 START 和 END 事件。 */public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}
/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}}
這個例子充分展現了 KeyedProcessFunction 強大的表達力,也因此是一個實現相當複雜的介面。
DataStream API
DataStream API 為許多通用的流處理操作提供了處理原語。這些操作包括視窗、逐條記錄的轉換操作,在處理事件時進行外部資料庫查詢等。DataStream API 支持 Java 和 Scala 語言,預先定義了例如map()、reduce()、aggregate() 等函數。你可以通過擴展實現預定義介面或使用 Java、Scala 的 lambda 表達式實現自定義的函數。
下麵的代碼示例展示瞭如何捕獲會話時間範圍內所有的點擊流事件,並對每一次會話的點擊量進行計數。
// 網站點擊 Click 的數據流DataStream<Click> clicks =
DataStream<Tuple2<String, Long>> result = clicks
// 將網站點擊映射為 (userId, 1) 以便計數
.map(
// 實現 MapFunction 介面定義函數
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以 userId (field 0) 作為 key
.keyBy(0)
// 定義 30 分鐘超時的會話視窗
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 對每個會話視窗的點擊進行計數,使用 lambda 表達式定義 reduce 函數
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SQL & Table API
Flink 支持兩種關係型的 API,Table API 和 SQL。這兩個 API 都是批處理和流處理統一的 API,這意味著在無邊界的實時數據流和有邊界的歷史記錄數據流上,關係型 API 會以相同的語義執行查詢,並產生相同的結果。Table API 和 SQL 藉助了 Apache Calcite 來進行查詢的解析,校驗以及優化。它們可以與 DataStream 和 DataSet API 無縫集成,並支持用戶自定義的標量函數,聚合函數以及表值函數。
Flink 的關係型 API 旨在簡化數據分析、數據流水線和 ETL 應用的定義。
下麵的代碼示例展示瞭如何使用 SQL 語句查詢捕獲會話時間範圍內所有的點擊流事件,並對每一次會話的點擊量進行計數。此示例與上述 DataStream API 中的示例有著相同的邏輯。
SELECT userId, COUNT(*)FROM clicksGROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
庫
Flink 具有數個適用於常見數據處理應用場景的擴展庫。這些庫通常嵌入在 API 中,且並不完全獨立於其它 API。它們也因此可以受益於 API 的所有特性,並與其他庫集成。
- 複雜事件處理(CEP):模式檢測是事件流處理中的一個非常常見的用例。Flink 的 CEP 庫提供了 API,使用戶能夠以例如正則表達式或狀態機的方式指定事件模式。CEP 庫與 Flink 的 DataStream API 集成,以便在 DataStream 上評估模式。CEP 庫的應用包括網路入侵檢測,業務流程監控和欺詐檢測。
- DataSet API:DataSet API 是 Flink 用於批處理應用程式的核心 API。DataSet API 所提供的基礎運算元包括map、reduce、(outer) join、co-group、iterate等。所有運算元都有相應的演算法和數據結構支持,對記憶體中的序列化數據進行操作。如果數據大小超過預留記憶體,則過量數據將存儲到磁碟。Flink 的 DataSet API 的數據處理演算法借鑒了傳統資料庫演算法的實現,例如混合散列連接(hybrid hash-join)和外部歸併排序(external merge-sort)。
- Gelly: Gelly 是一個可擴展的圖形處理和分析庫。Gelly 是在 DataSet API 之上實現的,並與 DataSet API 集成。因此,它能夠受益於其可擴展且健壯的操作符。Gelly 提供了內置演算法,如 label propagation、triangle enumeration 和 page rank 演算法,也提供了一個簡化自定義圖演算法實現的 Graph API。