案例來源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md 案例介紹 基於計程車付費事件流計算出每小時賺取最多小費的司機,最簡單的方法是通過兩個步驟來解決這個問題:首先使用一個小時長 ...
案例來源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md
案例介紹
基於計程車付費事件流計算出每小時賺取最多小費的司機,最簡單的方法是通過兩個步驟來解決這個問題:首先使用一個小時長的視窗來計算每個司機在一小時內的總小費,然後從該視窗結果流中找到每小時總小費最多的司機。
結果輸出:
每小時產生一個 HourlyTip對象 記錄的數據流。 這個記錄應包含該小時結束時的時間戳、 該小時內獲得小費最多的司機的 driverId 以及他的實際小費總數。
public class HourlyTip {
/**
* 小時結束時的時間戳
*/
private Long eventTime;
/**
* 司機id driverId
*/
private Long driverId;
/**
* 該小時獲得的小費總數
*/
private Float tips;
}
核心代碼
// 初始化環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定義計程車-車費數據源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare") // 避免kafka clientId重覆
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();
DataStreamSource<TaxiFare> fareStream = env.fromSource(fareSource, WatermarkStrategy.<TaxiFare>forMonotonousTimestamps().withTimestampAssigner((fare, t) -> fare.getStartTime()), "fare source");
// 按司機分組,對每小時內的數據進行統計,求出每個司機每小時的總小費
SingleOutputStreamOperator<HourlyTip> hourlyTipsStream = fareStream.keyBy(TaxiFare::getDriverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTipsFunction());
/**
* window和windowAll的區別
*
* keyBy後數據分流,window是把不同的key分開聚合成視窗
* 而windowAll是把所有的key都聚合起來,所以windowAll的並行度只能為1,而window可以有多個並行度
*
*/
// 把所有key彙總起來,找出每個小時總小費最多的司機
SingleOutputStreamOperator<HourlyTip> hourlyMaxStream = hourlyTipsStream.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).max("tips");
hourlyMaxStream.addSink(new PrintSinkFunction<>());
env.execute("Hourly Tips");
完整代碼
https://github.com/Mr-LuXiaoHua/study-flink
代碼入口 com.example.datastream.hourlytips.HourlyTipsJob