Flink-基於 DataStream API 實現欺詐檢測

来源:https://www.cnblogs.com/luxh/archive/2022/06/29/16424527.html
-Advertisement-
Play Games

案例來源於 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/ 案例背景 在當今數字時代,信用卡欺詐行為越來越被重視。 罪犯可以通過詐騙或者入侵安全級別較低系統來盜竊信用卡卡號。 ...


案例來源於 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/

案例背景

在當今數字時代,信用卡欺詐行為越來越被重視。 罪犯可以通過詐騙或者入侵安全級別較低系統來盜竊信用卡卡號。 用盜得的信用卡進行很小額度的例如一美元或者更小額度的消費進行測試。 如果測試消費成功,那麼他們就會用這個信用卡進行大筆消費,來購買一些他們希望得到的,或者可以倒賣的財物。

在這個教程中,你將會建立一個針對可疑信用卡交易行為的反欺詐檢測系統。 通過使用一組簡單的規則,你將瞭解到 Flink 如何為我們實現複雜業務邏輯並實時執行。

欺詐檢測規則

  1. 對於一個賬戶,如果出現一筆小於1元的交易後, 緊跟著在1分鐘內又出現一筆大於500元的交易,則認為該賬戶屬於欺詐,就輸出一個報警消息。
  2. 圖說明如下

對原有案例進行改造

1. 數據源使用Kafka,發送json格式字元串
消息格式:  {"accountId":1001, "timestamp":1656490723171, "amount":0.12}

2. 自定義 DeserializationSchema, 直接將kafka的json字元串轉成POJO對象

流程圖

核心代碼

  1. 自定義DeserializationSchema
public class TransactionDeserialization implements DeserializationSchema<Transaction> {
    @Override
    public Transaction deserialize(byte[] bytes) throws IOException {
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        String message = byteBufferToString(buffer);
        if (StringUtils.isBlank(message)) {
            return null;
        }
        Transaction transaction = JsonUtils.fromJson(message, Transaction.class);
        return transaction;
    }

    @Override
    public boolean isEndOfStream(Transaction transaction) {
        return false;
    }

    @Override
    public TypeInformation<Transaction> getProducedType() {
        return TypeInformation.of(Transaction.class);
    }



    /**
     * ByteBuffer 轉換 String
     * @param buffer
     * @return
     */
    private String byteBufferToString(ByteBuffer buffer) {
        String ret = "";
        try{
            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
            CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());;
            ret = charBuffer.toString();
        }catch (Exception e) {
            e.printStackTrace();
        }
        return ret;
    }
}
  1. 欺詐檢測核心代碼
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    /**
     * 定義小金額邊界
     */
    private static final double SMALL_AMOUNT = 1.00;

    /**
     * 定義大金額邊界
     */
    private static final double LARGE_AMOUNT = 500.00;

    /**
     * 1分鐘時間
     */
    private static final long ONE_MINUTE = 60 * 1000;

    /**
     * 保存是否有消費小金額的狀態
     */
    private transient ValueState<Boolean> smallAmountState;

    /**
     * 定時器狀態
     */
    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化ValueState

        ValueStateDescriptor<Boolean> smallAmountStateDescriptor = new ValueStateDescriptor<Boolean>("small-amount-state", Types.BOOLEAN);
        smallAmountState = getRuntimeContext().getState(smallAmountStateDescriptor);

        ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<Long>("timer-state", Types.LONG);
        timerState = getRuntimeContext().getState(timerStateDescriptor);

    }

    @Override
    public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
        if (Objects.isNull(transaction)) {
            return;
        }
        // Get the current state for the current key
        Boolean lastTransactionWasSmall = smallAmountState.value();

        // Check if the flag is set
        if (Objects.nonNull(lastTransactionWasSmall)) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                Alert alert = new Alert();
                alert.setAccountId(transaction.getAccountId());
                alert.setAmount(transaction.getAmount());

                collector.collect(alert);
            }
            clearUp(context);
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            smallAmountState.update(true);

            // 註冊定時器,設置一個當前時間一分鐘後觸發的定時器,同時,將觸發時間保存到 timerState 狀態中。
            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);
            timerState.update(timer);
        }

    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // remove flag after 1 minute
        timerState.clear();
        smallAmountState.clear();
    }

    private void clearUp(Context ctx) {
        try {
            // delete timer
            Long timer = timerState.value();
            ctx.timerService().deleteProcessingTimeTimer(timer);

            timerState.clear();
            smallAmountState.clear();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  1. FLink Job 啟動類
public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        // 初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka消息格式: {"accountId":1001, "timestamp":1656490723171, "amount":0.12}

        // 定義Kafka數據源
        KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_FRAUD_DETECTION")
                .setGroupId("TEST_GROUP")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TransactionDeserialization())
                .build();

        // 載入數據源
        DataStreamSource<Transaction> fraudDetectionSource
                = env.fromSource(source, WatermarkStrategy.noWatermarks(), "FraudDetection-Source");

        // 處理數據
        SingleOutputStreamOperator<Alert> alertStreamOperator = fraudDetectionSource.keyBy(Transaction::getAccountId)
                .process(new FraudDetector())
                .name("Fraud-Detector");

        // 輸出告警結果
        alertStreamOperator.addSink(new AlertSink())
                .name("Send-Alerts");

        env.execute("Fraud Detection");

    }
}

執行效果

  1. kafka輸入

  2. 告警結果

完整代碼

https://github.com/Mr-LuXiaoHua/study-flink

代碼入口: com.example.datastream.frauddetection.FraudDetectionJob

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 中移OneOS開發板學習入門 (做為一個專業的點燈愛好者,學習任何開發板開始前都是先找好學習資料,然後在學習OneOS的內核部分,外設部分,系統組件等) OneOS OneOS是中國移動針對物聯網領域推出的輕量級操作系統,具有可裁剪、跨平臺、低功耗、高安全等特點,支持ARM Cortex-A和 Co ...
  • 備份資料庫 問題描述: ​ 我們用的是mysql,以今天遇到的情況為例,我們是在兩台伺服器上要搭相同的平臺,部署完成後頁面報錯,發現是資料庫的問題,我們打開資料庫查看,確實資料庫中少建一個wind資料庫,但是我們沒有建這個資料庫的腳本,資料庫裡面涉及到很多表,很複雜,於是採用linux備份的方法,成 ...
  • 通過 hosts文件配置本地功能變數名稱 概念 DNS: 功能變數名稱系統(Domain Name System),是互聯網的一項服務。它作為將功能變數名稱和IP地址相互映射的一個分散式資料庫,能夠使人更方便地訪問互聯網。 將功能變數名稱映射到對應的IP地址。 互聯網通過IP定位瀏覽器建立連接,但是我們不易區別IP,為了方便用戶辨 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 作為一個專業小白,咱啥都不懂。 linux不懂,docker不懂。 但是我還想要完成領導下達的任務:在linux中安裝docker後部署數據可視化工具。作為一名敬業 的打工人擺爛不可以,躺平不可以,弱小,無助,可憐中。。這能力,這要去,要不直接散 ...
  • 編寫MBR主引導記錄,開始掌權 一.一些說明 CPU的硬體電路被設計成只能運行處於記憶體中的程式,這是硬體基因的問題,其原因是首先記憶體比較快且容量大,其次由於各個硬體特性不同,若被設計成運行硬體里的程式則操作系統要分別考慮每種硬體特性才行,為了達到統一,故選擇只運行記憶體中的程式。其次記憶體不僅僅是DRA ...
  • 桌面右鍵之後,點擊新建,感覺出來了的內容太雜亂了,看強迫症犯了,如圖: 強迫症患者,表示不能忍 1. win + R打開註冊表 2.在註冊表上複製: HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\Discar ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 MariaDB的安裝與配置 (菜雞我是用阿裡源安裝的) MariaDB是MySQL的一個分支,由開源社區維護,採用GPL授權許可,完全相容MySQL. 1.安裝相對應的源 vi /etc/yum.repos.d/MariaDB.repo # 填寫 ...
  • 使用磁碟空間過程 分區:毛坯房 格式化:配置創建文件系統(裝修) 掛載:把分區和文件系統進行組裝 Linux一切皆文件:通過文件的方式來管理磁碟 linux一切皆文件,所以磁碟也表現為文件。每個硬碟文件命名方式和磁碟的類型有關。硬碟文件在內核中自動生成識別,並存放在/dev這個文件下麵。 lvm:邏 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...