Flink-計程車-車程事件流和付車費事件流connect

来源:https://www.cnblogs.com/luxh/archive/2022/06/30/16427196.html
-Advertisement-
Play Games

案例來源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md 案例背景 計程車車程(taxi ride)事件結構 1.每次車程都由兩個事件表示:行程開始(trip start)和行程結束(trip end) ...


案例來源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md

案例背景

計程車車程(taxi ride)事件結構
1.每次車程都由兩個事件表示:行程開始(trip start)和行程結束(trip end)。
2.每個事件都由十一個欄位組成:

rideId         : Long      // 每次車程的唯一id
taxiId         : Long      // 每一輛計程車的唯一id
driverId       : Long      // 每一位司機的唯一id
isStart        : Boolean   // 行程開始事件為 TRUE, 行程結束事件為 FALSE
eventTime      : Long      // 事件的時間戳
startLon       : Float     // 車程開始位置的經度
startLat       : Float     // 車程開始位置的維度
endLon         : Float     // 車程結束位置的經度
endLat         : Float     // 車程結束位置的維度
passengerCnt   : Short     // 乘車人數

計程車車費(taxi fare)事件結構
rideId         : Long      // 每次車程的唯一id
taxiId         : Long      // 每一輛計程車的唯一id
driverId       : Long      // 每一位司機的唯一id
startTime      : Long   // 車程開始時間
paymentType    : String    // 現金(CASH)或刷卡(CARD)
tip            : Float     // 小費
tolls          : Float     // 過路費
totalFare      : Float     // 總計車費

案例目標

1.將每次車程的 TaxiRide 和 TaxiFare 記錄連接在一起

2.對於每個不同的 rideId,恰好有三個事件:

TaxiRide START 事件
TaxiRide END 事件
一個 TaxiFare 事件(其時間戳恰好與開始時間匹配)

最終的結果應該是 DataStream<RideAndFare>,每個不同的 rideId 都產生一個 RideAndFare 記錄。 每個 RideAndFare 都應該將某個 rideId 的 TaxiRide START 事件與其匹配的 TaxiFare 配對。

案例流程

核心代碼

  • connect 可以將兩個流連接成一個ConnectedStreams, 而且不要求兩個流的數據類型一致
       // 從車程事件中過濾中車程開始時間,並按車程標識 rideId 分組
        KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
                .filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);

        // 付車費事件按行程標識 rideId 分組
        KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
                .keyBy(TaxiFare::getRideId);

        rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
                .uid("enrichment") // uid for this operator's state
                .name("enrichment") // name for this operator in the web UI
                .addSink(new PrintSinkFunction<>());
  • 使用ValueState保存事件狀態
public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {

    private ValueState<TaxiRide> taxiRideValueState;
    private ValueState<TaxiFare> taxiFareValueState;


    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
        ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);

        taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
        taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);

    }


    /**
     * 當車程事件到來,檢查車費的taxiFareValueState是否保存有對應行程付費記錄
     * 如果有,則匹配輸出,清空狀態
     * 如果沒有,則將車程事件保存起來
     */
    @Override
    public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
        TaxiFare taxiFare = taxiFareValueState.value();
        if (Objects.isNull(taxiFare)) {
            taxiRideValueState.update(taxiRide);
        } else {
            taxiFareValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);
        }
    }


    /**
     * 當付費事件到來,檢查車程的taxiRideValueState是否保存有對應行程車程記錄
     * 如果有,則匹配輸出,清空狀態
     * 如果沒有,則將付費事件保存起來
     */
    @Override
    public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
        TaxiRide taxiRide = taxiRideValueState.value();
        if (Objects.isNull(taxiRide)) {
            taxiFareValueState.update(taxiFare);
        } else {
            taxiRideValueState.clear();

            RideAndFare rideAndFare = new RideAndFare();
            rideAndFare.setRide(taxiRide);
            rideAndFare.setFare(taxiFare);

            collector.collect(rideAndFare);

        }
    }
}

  • 車程事件流和付費事件流來自Kafka
       // 定義計程車-車程數據源
        KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_RIDE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("ride") // 避免kafka clientId重覆
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiRideDeserialization())
                .build();

        // 定義計程車-車費數據源
        KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_FARE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("fare") // 避免kafka clientId重覆
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiFareDeserialization())
                .build();

事件格式:

1.車程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}


2.付費事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}

完整代碼

https://github.com/Mr-LuXiaoHua/study-flink

程式入口: com.example.datastream.rideandfare.RideAndFareJob

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在各種群里經常討論的一個事情是.NET 如何調用 Java 的實現,最常見的場景之一就是在加解密方面Java提供的密鑰,C#無法解密, C#中byte範圍是[0,255],而Java中的byte範圍是[-128,127],由於密碼生成器是java所獨有的,其它語言都不支持(IOS,ANDROID,C ...
  • MediaWPF 基於 .NET 6 實現視頻硬解碼渲染Demo(無空域問題) 代碼實現僅供學習參考 本項目視頻渲染通過顯卡進行視頻解碼,CPU幾乎不參與工作,並且不存在令人煩躁的空域問題。 在播放攝像頭多路視頻或高解析度、高幀率視頻時可以極大發揮顯卡性能(我認為該項目做到了這一點)。 支持各類網路 ...
  • 一、簡介 LVM(Logical Volume Manager)邏輯捲管理,是在硬碟分區和文件系統之間添加的一個邏輯層,為文件系統屏蔽下層硬碟分區佈局,並提供一個抽象的盤捲,在盤捲上建立文件系統。管理員利用LVM可以在硬碟不用重新分區的情況下動態調整文件系統的大小,並且利用LVM管理的文件系統可以跨 ...
  • 磁碟分區的好處: 優化IO性能、實現磁碟配額、隔離系統和程式、可以採用多個文件系統 分區的方式: MBR 比較傳統的分區 GPT MBR分區: MBR:Master Boot Record(主引導記錄),1982年開發而來,使用32位表示扇區數,分區不超過2T 特點:支持的分區空間有限 0磁軌0扇區 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 一、下載安裝Anaconda 1.雙擊運行anaconda安裝包,點擊下一步 2.同意協議 3.這裡有兩個選項,假如你的電腦里有好幾個Users,可以選擇All Users,但一般來說我們電腦里只有一個User,所以預設勾選Just Me 4.選 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 最近不知道為啥,每天晚上ubuntu系統都會自動進入休眠狀態,然後遠程連接就斷開了,還得等第二天到機子那按一下電源鍵來喚醒系統。可是回想起來自己也沒有修改過任何設置,之前電腦都是不會自動休眠的,這就很奇怪了。不過,個人猜測自動休眠也可能是由於系統 ...
  • 二、Linux系統的安裝 所需要的虛擬環境 VMware Workstation Pro 、redhat系統(iso文件) 首先打開VMware Workstation Pro 軟體,在左上角有“文件”按鈕,點擊“文件“按鈕有個“新建虛擬機”,點擊“新建虛擬機“創建一個新的虛擬機,進入新建虛擬機向 ...
  • 原文鏈接:基於開源流批一體數據同步引擎ChunJun數據還原—DDL解析模塊的實戰分享 課件獲取:關註公眾號** “數棧研習社”,後臺私信 “ChunJun”**獲得直播課件 視頻回放:點擊這裡 ChunJun開源項目地址:github 丨 gitee 喜歡我們的項目給我們點個 STAR!STAR! ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...