案例來源於 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/ 案例背景 在當今數字時代,信用卡欺詐行為越來越被重視。 罪犯可以通過詐騙或者入侵安全級別較低系統來盜竊信用卡卡號。 ...
案例來源於 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
案例背景
在當今數字時代,信用卡欺詐行為越來越被重視。 罪犯可以通過詐騙或者入侵安全級別較低系統來盜竊信用卡卡號。 用盜得的信用卡進行很小額度的例如一美元或者更小額度的消費進行測試。 如果測試消費成功,那麼他們就會用這個信用卡進行大筆消費,來購買一些他們希望得到的,或者可以倒賣的財物。
在這個教程中,你將會建立一個針對可疑信用卡交易行為的反欺詐檢測系統。 通過使用一組簡單的規則,你將瞭解到 Flink 如何為我們實現複雜業務邏輯並實時執行。
欺詐檢測規則
- 對於一個賬戶,如果出現一筆小於1元的交易後, 緊跟著在1分鐘內又出現一筆大於500元的交易,則認為該賬戶屬於欺詐,就輸出一個報警消息。
- 圖說明如下
對原有案例進行改造
1. 數據源使用Kafka,發送json格式字元串
消息格式: {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
2. 自定義 DeserializationSchema, 直接將kafka的json字元串轉成POJO對象
流程圖
核心代碼
- 自定義DeserializationSchema
public class TransactionDeserialization implements DeserializationSchema<Transaction> {
@Override
public Transaction deserialize(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
String message = byteBufferToString(buffer);
if (StringUtils.isBlank(message)) {
return null;
}
Transaction transaction = JsonUtils.fromJson(message, Transaction.class);
return transaction;
}
@Override
public boolean isEndOfStream(Transaction transaction) {
return false;
}
@Override
public TypeInformation<Transaction> getProducedType() {
return TypeInformation.of(Transaction.class);
}
/**
* ByteBuffer 轉換 String
* @param buffer
* @return
*/
private String byteBufferToString(ByteBuffer buffer) {
String ret = "";
try{
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());;
ret = charBuffer.toString();
}catch (Exception e) {
e.printStackTrace();
}
return ret;
}
}
- 欺詐檢測核心代碼
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
/**
* 定義小金額邊界
*/
private static final double SMALL_AMOUNT = 1.00;
/**
* 定義大金額邊界
*/
private static final double LARGE_AMOUNT = 500.00;
/**
* 1分鐘時間
*/
private static final long ONE_MINUTE = 60 * 1000;
/**
* 保存是否有消費小金額的狀態
*/
private transient ValueState<Boolean> smallAmountState;
/**
* 定時器狀態
*/
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化ValueState
ValueStateDescriptor<Boolean> smallAmountStateDescriptor = new ValueStateDescriptor<Boolean>("small-amount-state", Types.BOOLEAN);
smallAmountState = getRuntimeContext().getState(smallAmountStateDescriptor);
ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<Long>("timer-state", Types.LONG);
timerState = getRuntimeContext().getState(timerStateDescriptor);
}
@Override
public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
if (Objects.isNull(transaction)) {
return;
}
// Get the current state for the current key
Boolean lastTransactionWasSmall = smallAmountState.value();
// Check if the flag is set
if (Objects.nonNull(lastTransactionWasSmall)) {
if (transaction.getAmount() > LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setAccountId(transaction.getAccountId());
alert.setAmount(transaction.getAmount());
collector.collect(alert);
}
clearUp(context);
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
smallAmountState.update(true);
// 註冊定時器,設置一個當前時間一分鐘後觸發的定時器,同時,將觸發時間保存到 timerState 狀態中。
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// remove flag after 1 minute
timerState.clear();
smallAmountState.clear();
}
private void clearUp(Context ctx) {
try {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
smallAmountState.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- FLink Job 啟動類
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 初始化環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka消息格式: {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
// 定義Kafka數據源
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FRAUD_DETECTION")
.setGroupId("TEST_GROUP")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TransactionDeserialization())
.build();
// 載入數據源
DataStreamSource<Transaction> fraudDetectionSource
= env.fromSource(source, WatermarkStrategy.noWatermarks(), "FraudDetection-Source");
// 處理數據
SingleOutputStreamOperator<Alert> alertStreamOperator = fraudDetectionSource.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("Fraud-Detector");
// 輸出告警結果
alertStreamOperator.addSink(new AlertSink())
.name("Send-Alerts");
env.execute("Fraud Detection");
}
}
執行效果
-
kafka輸入
-
告警結果
完整代碼
https://github.com/Mr-LuXiaoHua/study-flink
代碼入口: com.example.datastream.frauddetection.FraudDetectionJob