案例來源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md 案例背景 計程車車程(taxi ride)事件結構 1.每次車程都由兩個事件表示:行程開始(trip start)和行程結束(trip end) ...
案例來源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md
案例背景
計程車車程(taxi ride)事件結構
1.每次車程都由兩個事件表示:行程開始(trip start)和行程結束(trip end)。
2.每個事件都由十一個欄位組成:
rideId : Long // 每次車程的唯一id
taxiId : Long // 每一輛計程車的唯一id
driverId : Long // 每一位司機的唯一id
isStart : Boolean // 行程開始事件為 TRUE, 行程結束事件為 FALSE
eventTime : Long // 事件的時間戳
startLon : Float // 車程開始位置的經度
startLat : Float // 車程開始位置的維度
endLon : Float // 車程結束位置的經度
endLat : Float // 車程結束位置的維度
passengerCnt : Short // 乘車人數
計程車車費(taxi fare)事件結構
rideId : Long // 每次車程的唯一id
taxiId : Long // 每一輛計程車的唯一id
driverId : Long // 每一位司機的唯一id
startTime : Long // 車程開始時間
paymentType : String // 現金(CASH)或刷卡(CARD)
tip : Float // 小費
tolls : Float // 過路費
totalFare : Float // 總計車費
案例目標
1.將每次車程的 TaxiRide 和 TaxiFare 記錄連接在一起
2.對於每個不同的 rideId,恰好有三個事件:
TaxiRide START 事件
TaxiRide END 事件
一個 TaxiFare 事件(其時間戳恰好與開始時間匹配)
最終的結果應該是 DataStream<RideAndFare>,每個不同的 rideId 都產生一個 RideAndFare 記錄。 每個 RideAndFare 都應該將某個 rideId 的 TaxiRide START 事件與其匹配的 TaxiFare 配對。
案例流程
核心代碼
- connect 可以將兩個流連接成一個ConnectedStreams, 而且不要求兩個流的數據類型一致
// 從車程事件中過濾中車程開始時間,並按車程標識 rideId 分組
KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
.filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);
// 付車費事件按行程標識 rideId 分組
KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
.keyBy(TaxiFare::getRideId);
rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
.uid("enrichment") // uid for this operator's state
.name("enrichment") // name for this operator in the web UI
.addSink(new PrintSinkFunction<>());
- 使用ValueState保存事件狀態
public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
private ValueState<TaxiRide> taxiRideValueState;
private ValueState<TaxiFare> taxiFareValueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);
taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);
}
/**
* 當車程事件到來,檢查車費的taxiFareValueState是否保存有對應行程付費記錄
* 如果有,則匹配輸出,清空狀態
* 如果沒有,則將車程事件保存起來
*/
@Override
public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
TaxiFare taxiFare = taxiFareValueState.value();
if (Objects.isNull(taxiFare)) {
taxiRideValueState.update(taxiRide);
} else {
taxiFareValueState.clear();
RideAndFare rideAndFare = new RideAndFare();
rideAndFare.setRide(taxiRide);
rideAndFare.setFare(taxiFare);
collector.collect(rideAndFare);
}
}
/**
* 當付費事件到來,檢查車程的taxiRideValueState是否保存有對應行程車程記錄
* 如果有,則匹配輸出,清空狀態
* 如果沒有,則將付費事件保存起來
*/
@Override
public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
TaxiRide taxiRide = taxiRideValueState.value();
if (Objects.isNull(taxiRide)) {
taxiFareValueState.update(taxiFare);
} else {
taxiRideValueState.clear();
RideAndFare rideAndFare = new RideAndFare();
rideAndFare.setRide(taxiRide);
rideAndFare.setFare(taxiFare);
collector.collect(rideAndFare);
}
}
}
- 車程事件流和付費事件流來自Kafka
// 定義計程車-車程數據源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride") // 避免kafka clientId重覆
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();
// 定義計程車-車費數據源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare") // 避免kafka clientId重覆
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();
事件格式:
1.車程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}
2.付費事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}
完整代碼
https://github.com/Mr-LuXiaoHua/study-flink
程式入口: com.example.datastream.rideandfare.RideAndFareJob