Kafka+Fink 實戰+工具類

来源:https://www.cnblogs.com/xietingwei/archive/2023/08/19/17643274.html
-Advertisement-
Play Games

- LogServiceImpl ``` @Service @Slf4j public class LogServiceImpl implements LogService { private static final String TOPIC_NAME = "ods_link_visit_topi ...


  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 記錄日誌
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 瀏覽器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部請求頭
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日誌類型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日誌內容
                .data(availableMap)
                //客戶端ip
                .ip(ip)
                // 時間
                .ts(CommonUtil.getCurrentTimestamp())
                //業務唯一標識(短鏈碼)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //列印日誌 in 控制台
        log.info(jsonLog);

        // 發送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定義 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定義 消費組
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定義 消費組
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端設備唯一標識
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分組
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 識別新老訪客    richMap open函數,對狀態以及日期格式進行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老訪客");

        // 存儲到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 獲取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject dataJsonObj = jsonObject.getJSONObject("data");
        if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                    URL url = new URL(referer);
                    return url.getHost();
                } catch (MalformedURLException e) {
                    log.error("提取referer失敗:{}",e.toString());
                }
            }
        }

        return "";

    }

    /**
     * 生成設備唯一標識
     *
     * @param jsonObject
     * @return
     */
    public static String getDeviceId(JSONObject jsonObject){
        Map<String,String> map= new TreeMap<>();

        try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

        }catch (Exception e){
            log.error("生產唯一deviceId異常:{}", jsonObject);
            return null;
        }


    }


}

  • KafkaUtil

    @Slf4j
    public class KafkaUtil {
    
        /**
         * kafka 的 broker 地址
         */
        private static String KAFKA_SERVER = null;
    
        static {
            Properties properties = new Properties();
    
            InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    
            try {
                properties.load(in);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("載入Kafka配置文件失敗:{}",e.getMessage());
            }
    
            //獲取配置文件中的value
            KAFKA_SERVER = properties.getProperty("kafka.servers");
    
        }
    
        /**
         * 獲取flink的kafka消費者
         * @param topic
         * @param groupId
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    
            return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
        }
    
        /**
         * 獲取flink的kafka生產者
         * @param topic
         * @return
         */
        public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
            return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
        }
    }
    
    
  • TimeUtil

    public class TimeUtil {
    
        /**
         * 預設日期格式
         */
        private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    
        /**
         * 預設日期格式
         */
        private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    
        private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    
    
        /**
         * LocalDateTime 轉 字元串,指定日期格式
         * @param localDateTime
         * @param pattern
         * @return
         */
        public static String format(LocalDateTime localDateTime, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * Date 轉 字元串, 指定日期格式
         * @param time
         * @param pattern
         * @return
         */
        public static String format(Date time, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         *  Date 轉 字元串,預設日期格式
         * @param time
         * @return
         */
        public static String format(Date time){
    
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         * timestamp 轉 字元串,預設日期格式
         *
         * @param timestamp
         * @return
         */
        public static String format(long timestamp) {
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * 字元串 轉 Date
         *
         * @param time
         * @return
         */
        public static Date strToDate(String time) {
            LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
            return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    
        }
    
    }
    

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 筆者之前開發過一套C/S架構的桌面應用,採用了JWT作為用戶的登錄認證和授權。遇到的唯一問題就是JWT過期了該怎麼辦?設想當一個用戶正在進行業務操作,突然因為Token過期失效,莫名其妙地跳轉到登錄界面,是不是一件很無語的事。當然筆者也曾想過:為何不把JWT的有效期儘量設長些(假設24小時), ...
  • # Unity 如何獲取Texture 的記憶體大小 在Unity中,要獲取Texture的記憶體文件大小,可以使用UnityEditor.TextureUtil類中的一些函數。這些函數提供了獲取存儲記憶體大小和運行時記憶體大小的方法。由於UnityEditor.TextureUtil是一個內部類,我們需要 ...
  • 為什麼需要設備驅動模型 內核版本發展 2.4版本之前內核沒有統一的設備驅動模型,但是可以用(例如先前的led字元設備驅動實驗,使用前需要手動調用mknod命令創建設備文件,從而進一步控制硬體)。 2.4~2.6版本內核使用devfs,掛載在/dev目錄。需要在內核驅動中創建設備文件(調用devfs_ ...
  • 轉載請標明出處,維權必究: https://www.cnblogs.com/tangZH/p/12900387.html Glide源碼解析一,初始化 Glide源碼解析二—into方法 Glide源碼解析三(註冊組件) Glide源碼解析四(解碼和轉碼) Glide自定義組件註冊 通過Glide加 ...
  • > Vue2.x使用EventBus進行組件通信,而Vue3.x推薦使用`mitt.js`。 > > > 比起Vue實例上的`EventBus`,`mitt.js`好在哪裡呢?首先它足夠小,僅有200bytes,其次支持全部事件的監聽和批量移除,它還不依賴Vue實例,所以可以跨框架使用,React或 ...
  • ![](https://img2023.cnblogs.com/blog/3076680/202308/3076680-20230817140634376-621525736.png) # 1. 康威定律 ## 1.1. 梅爾文·康威 ### 1.1.1. Melvin Conway ### 1.1 ...
  • 這篇文章總結了常用的架構圖類型,可以借鑒筆者提供的模板,快速地產出符合業務需要的架構圖。 為什麼要畫好一幅架構圖?一幅漂亮的架構圖既是創作者的深度結構化思考和表達,對於讀者來說也更加容易理解架構所要表達的意思。 然而不擅長畫圖的程式員,在大腦里已經有了思路,如何快速能夠產出精美的架構圖呢?這篇文章幫 ...
  • [TOC] # 本篇前瞻 歡迎來go語言的基礎篇,這裡會幫你梳理一下go語言的基本類型,註意本篇有參考[go聖經](https://gopl-zh.github.io/),如果你有完整學習的需求可以看一下。另外,go語言的基本類型比較簡單,介紹過程就比較粗暴,不過我們需要先從一個例題開始。 # Le ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...