以前的數據分析項目(版本1.4.2),對從 讀取的原始數據流,調用 介面實現分流. 新項目決定使用 ,使用 介面進行分流的時候,發現介面被標記為 (後續可能會被移除). 搜索相關文檔,發現新版本 中推薦使用帶外數據進行分流. 預先建立 實例( 是從 讀取的日誌實例類). 對 讀取的原始數據,通過 接 ...
以前的數據分析項目(版本1.4.2),對從Kafka
讀取的原始數據流,調用split
介面實現分流.
新項目決定使用Flink 1.7.2
,使用split
介面進行分流的時候,發現介面被標記為depracted
(後續可能會被移除).
搜索相關文檔,發現新版本Flink
中推薦使用帶外數據進行分流.
預先建立OutputTag
實例(LogEntity
是從kafka
讀取的日誌實例類).
private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
對kafka
讀取的原始數據,通過process
介面,打上相應標記.
private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) {
return rawLogStream
.process(new ProcessFunction<LogEntity, LogEntity>() {
@Override
public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception {
// 根據日誌等級,給對象打上不同的標記
if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
ctx.output(ANALYZE_METRIC_TAG, entity);
} else {
ctx.output(APP_LOG_TAG, entity);
}
}
})
.name("RawLogEntitySplitStream");
}
// 調用函數,對原始數據流中的對象進行標記
SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream);
// 根據標記,獲取不同的數據流,以便後續進行進一步分析
DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);
通過以上步驟,就實現了數據流的切分.
PS:
如果您覺得我的文章對您有幫助,可以掃碼領取下紅包或掃碼支持(隨意多少,一分錢都是愛),謝謝!
支付寶紅包 | 支付寶 | 微信 |
---|---|---|