Flink程式構建的基本單元是stream和transformation(DataSet實質上也是stream)。stream是一個中間結果數據,transformation對數據的加工和操作,該操作以一個或多個stream為輸入,計算輸出一個或多個stream為結果,最後可以sink來存儲數據。 ...
一、統計流程
所有流計算統計的流程都是:
1、接入數據源
2、進行多次數據轉換操作(過濾、拆分、聚合計算等)
3、計算結果的存儲 其中數據源可以是多個、數據轉換的節點處理完數據可以發送到一個和多個下一個節點繼續處理數據
Flink程式構建的基本單元是stream和transformation(DataSet實質上也是stream)。stream是一個中間結果數據,transformation對數據的加工和操作,該操作以一個或多個stream為輸入,計算輸出一個或多個stream為結果,最後可以sink來存儲數據。
包括數據源,每一次發射出來的數據結果都通過DataStream來傳遞給下一級繼續處理
每一個Transformation要有2步:
1、處理數據
2、將處理完的數據發射出去
二、Flink的數據源
Flink提供數據源只需要實現SourceFunction介面即可。 SourceFunction有一個抽象實現類RichParallelSourceFunction 繼承該實現類,實現3個方法,既可以自定義Source public void open(Configuration parameters) //初始化時調用,可以初始化一些參數 public void run(SourceContext
該例子中是每20秒發送出去一個Order類型的實體。
三、Flink的數據轉換操作
Flink針對於不同的場景提供了不同的解決方案,減少了用戶去關註處理過程中的效率問題。
常見的操作有下麵這些:“map”就是做一些映射,比如我們把兩個字元串合併成一個字元串,把一個字元串拆成兩個或者三個字元串。
“flatMap”類似於把一個記錄拆分成兩條、三條、甚至是四條記錄,例如把一個字元串分割成一個字元數組。
“Filter”就類似於過濾。
“keyBy”就等效於SQL里的group by。
“aggregate”是一個聚合操作,如計數、求和、求平均等。
“reduce”就類似於MapReduce里的reduce。
“join”操作就有點類似於我們資料庫裡面的join。
“connect”實現把兩個流連成一個流。
“repartition”是一個重新分區操作(還沒研究)。
“project”操作就類似於SQL裡面的snacks(還沒研究)。
常見的操作有filter、map、flatMap、keyBy(分組)、aggregate(聚合) 具體的使用方式後面的例子中會體現。
三、視窗
流數據的計算可以把連續不斷的數據按照一定的規則拆分成大量的片段,在片段內進行統計和計算。比如可以把一小時內的數據保存到一個小的資料庫表裡,然後對這部分數據進行計算和統計,而流計算只不過是實時進行的。
常見的視窗有:
1、以時間為單位的Time Window,例如:每1秒鐘、每1個小時等
2、以數據的數量為單位的Count Window,例如:每一百個元素
Flink給我們提供了一些通用的時間視窗模型。
1、Tumbling Windows(不重疊的)
數據流中的每一條數據僅屬於一個視窗。每一個都有固定的大小,同時視窗間彼此之間不會出現重疊的部分。如果指定一個大小為5分鐘的tumbling視窗,那麼每5分鐘便會啟動一個視窗,如下圖所示:
2、Sliding Windows(重疊的)
與Tumbling視窗不同的是,在構建Sliding視窗時不僅需要指定視窗大小,還會指定一個視窗滑動參數(window slide parameter)來確定視窗的開始位置。因此當視窗滑動參數小於視窗大小時,視窗之間可能會出現重覆的區域。 例如,當你指定視窗大小為10分鐘,滑動參數為5分鐘時,如下圖所示:
3、Session Windows (會話視窗)
當數據流中一段時間沒有數據,則Session視窗會關閉。因此,Session Windows沒有固定的大小,無法計算Session視窗的開始位置。
四、Flink中的時間概念
Flink中有3中不同的時間概念
- 處理時間 Processing Time指的是我們上面進行Transformation操作時,當時的系統時間。
2.事件時間 Event Time指的是業務發生時間,每一條業務記錄上會攜帶一個時間戳,我們需要指定數據中那一個屬性中獲取。在按業務發生時間統計數據時,我們面臨一個問題,當我們接收的數據的時間是無序的時候,我們什麼時間去觸發聚合計算,我們不可能無限制的等待。Flink引入了Watermark的概念,這個Watermark是添加在視窗上的,是告訴視窗我們最長等待的時間是多久,超過這個時間的數據就拋棄不再處理。
- 提取時間 Ingestion Time指的是數據進入Flink當時的系統時間。
五、訂單統計的例子
第四步:設置時間戳和Watermarks
DataStream<Order> marksSource = vilidatedSource
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.minutes(1)){
@Override
public long extractTimestamp(Order o) {
return o.getTimestamp().getTime();
}
});
前面已經設置了使用EventTime來處理數據,那麼在進行時間視窗計算前必須給數據分配獲取時間戳的欄位,這裡設置了Order的timestamp欄位為EventTime,同時這裡也設置了一個1分鐘的Watermarks,表示最多等待1分鐘,業務發生時間超過系統時間1分鐘的數據都不進行統計。
第五步:數據分組
KeyedStream<Order, Tuple> keyedStream =
marksSource.keyBy("biz");//先以biz來Group
這裡設置了以Order中biz欄位進行分組,這就意味著所有biz相同的數據會進入到同一個時間視窗中進行計算。
第六步:指定時間視窗、聚合計算
DataStream<List<Tuple2<String, String>>> results = keyedStream
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderSumAggregator()).setParallelism(1);
這裡設置了一個以1分鐘為單位的不重疊的TumblingEventTimeWindow。 然後使用OrderSumAggregator來進行聚合計算。 需要註意的是如果最前面設置的是使用ProcessTime來處理數據,這裡的視窗就會變成TumblingProcessTimeWinwow,前後必須一一對應,之前就因為前後不對應,統計結果不正確一直招不到原因。
六、聚合計算
上面例子中比較核心的部分就是聚合計算,也就是我們的OrderSumAggregator 聚合計算我們只需要實現Flink給我們提供的AggregateFunction介面,重寫其方法即可。
ACC createAccumulator();//創建一個數據統計的容器,提供給後續操作使用。
ACC add(IN in, ACC acc);//每個元素被添加進視窗的時候調用。 第一個參數是添加進視窗的元素,第二個參數是統計的容器(上面創建的那個)。
OUT getResult(ACC acc);//視窗統計事件觸發時調用來返回出統計的結果。
ACC merge(ACC acc1, ACC acc2);//只有在當視窗合併的時候調用,合併2個容器
其中這個容器根據情況也可以是在記憶體里提供,也可以是在其他存儲設備中提供。
通過上面的例子我們就實現了按照業務時間來統計每分鐘內的訂單數量,訂單最多可以延遲1分鐘上報。 但是我們為了等待1分鐘內上報的數據,造成了數據會延遲1分鐘進行統計,例如8點02分我們才能統計到8點到8點01分上報的數據。 為瞭解決這個問題,我們可以給window再增加一個自定義的統計觸發器,這個觸發器可以在整點觸發統計事件(也就是調用上面的getResults方法),這樣就達到了8點到8點01分這個時間段的數據,在8點01分統計一次,在8點02分再重新統計一次(加上後面1分鐘上報的數據)。
作者:京東科技 梁發文
來源:京東雲開發者社區 轉載請註明來源