[Spring cloud 一步步實現廣告系統] 16. 增量索引實現以及投送數據到MQ(kafka)

来源:https://www.cnblogs.com/zhangpan1244/archive/2019/08/10/11333229.html
-Advertisement-
Play Games

實現增量數據索引 上一節中,我們為實現增量索引的載入做了充足的準備,使用到 開源組件來實現MySQL 的binlog監聽,關於binlog的相關知識,大家可以自行網路查閱。或者可以 本節我們將根據binlog 的數據對象,來實現增量數據的處理,我們構建廣告的增量數據,其實說白了就是為了在後期能把廣告 ...


實現增量數據索引

上一節中,我們為實現增量索引的載入做了充足的準備,使用到mysql-binlog-connector-java 開源組件來實現MySQL 的binlog監聽,關於binlog的相關知識,大家可以自行網路查閱。或者可以mailto:[email protected]

本節我們將根據binlog 的數據對象,來實現增量數據的處理,我們構建廣告的增量數據,其實說白了就是為了在後期能把廣告投放到索引服務,實現增量數據到增量索引的生成。Let's code.

  • 定義一個投遞增量數據的介面(接收參數為我們上一節定義的binlog日誌的轉換對象)
/**
 * ISender for 投遞增量數據 方法定義介面
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
public interface ISender {

    void sender(MysqlRowData rowData);
}
  • 創建增量索引監聽器
/**
 * IncrementListener for 增量數據實現監聽
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 * @since 2019/6/27
 */
@Slf4j
@Component
public class IncrementListener implements Ilistener {

    private final AggregationListener aggregationListener;

    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }

    //根據名稱選擇要註入的投遞方式
    @Resource(name = "indexSender")
    private ISender sender;

    /**
     * 標註為 {@link PostConstruct},
     * 即表示在服務啟動,Bean完成初始化之後,立刻初始化
     */
    @Override
    @PostConstruct
    public void register() {
        log.info("IncrementListener register db and table info.");
        Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
    }

    @Override
    public void onEvent(BinlogRowData eventData) {
        TableTemplate table = eventData.getTableTemplate();
        EventType eventType = eventData.getEventType();

        //包裝成最後需要投遞的數據
        MysqlRowData rowData = new MysqlRowData();
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTableTemplate().getLevel());
        //將EventType轉為OperationTypeEnum
        OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
        rowData.setOperationTypeEnum(operationType);

        //獲取模版中該操作對應的欄位列表
        List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
        if (null == fieldList) {
            log.warn("{} not support for {}.", operationType, table.getTableName());
            return;
        }

        for (Map<String, String> afterMap : eventData.getAfter()) {
            Map<String, String> _afterMap = new HashMap<>();
            for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                String colName = entry.getKey();
                String colValue = entry.getValue();

                _afterMap.put(colName, colValue);
            }

            rowData.getFieldValueMap().add(_afterMap);
        }
        sender.sender(rowData);
    }
}
開啟binlog監聽
  • 首先來配置監聽binlog的資料庫連接信息
adconf:
  mysql:
    host: 127.0.0.1
    port: 3306
    username: root
    password: 12345678
    binlogName: ""
    position: -1 # 從當前位置開始監聽

編寫配置類:

/**
 * BinlogConfig for 定義監聽Binlog的配置信息
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String binlogName;
    private Long position;
}

在我們實現 監聽binlog那節,我們實現了一個自定義client CustomBinlogClient,需要實現binlog的監聽,這個監聽的客戶端就必須是一個獨立運行的線程,並且要在程式啟動的時候進行監聽,我們來實現運行當前client的方式,這裡我們會使用到一個新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {

    @Autowired
    private CustomBinlogClient binlogClient;

    @Override
    public void run(String... args) throws Exception {
        log.info("BinlogRunner is running...");
        binlogClient.connect();
    }
}
增量數據投遞

在binlog監聽的過程中,我們看到針對於int, String 這類數據欄位,mysql的記錄是沒有問題的,但是針對於時間類型,它被格式化成了字元串類型:Fri Jun 21 15:07:53 CST 2019

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
    {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

對於這個時間格式,我們需要關註2點信息:

  • CST,這個時間格式會比我們的時間+ 8h(中國標準時間 China Standard Time UT+8:00)
  • 需要對這個日期進行解釋處理

當然,我們也可以通過設置mysql的日期格式來改變該行為,在此,我們通過編碼來解析該時間格式:

  /**
   * Thu Jun 27 08:00:00 CST 2019
   */
  public static Date parseBinlogString2Date(String dateString) {
      try {
          DateFormat dateFormat = new SimpleDateFormat(
                  "EEE MMM dd HH:mm:ss zzz yyyy",
                  Locale.US
          );
          return DateUtils.addHours(dateFormat.parse(dateString), -8);

      } catch (ParseException ex) {
          log.error("parseString2Date error:{}", dateString);
          return null;
      }
  }

因為我們在定義索引的時候,是根據表之間的層級關係(Level)來設定的,根據代碼規範,不允許出現Magic Number, 因此我們定義一個數據層級枚舉,來表達數據層級。

/**
 * AdDataLevel for 廣告數據層級
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Getter
public enum AdDataLevel {

    LEVEL2("2", "level 2"),
    LEVEL3("3", "level 3"),
    LEVEL4("4", "level 4");

    private String level;
    private String desc;

    AdDataLevel(String level, String desc) {
        this.level = level;
        this.desc = desc;
    }
}
實現數據投遞

因為增量數據可以投遞到不同的位置以及用途,我們之前實現了一個投遞介面com.sxzhongf.ad.sender.ISender,接下來我們實現一個投遞類:

@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {

    /**
     * 根據廣告級別,投遞Binlog數據
     */
    @Override
    public void sender(MysqlRowData rowData) {
        if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
            Level2RowData(rowData);
        } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
            Level3RowData(rowData);
        } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
            Level4RowData(rowData);
        } else {
            log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
        }
    }

    private void Level2RowData(MysqlRowData rowData) {

        if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
            List<AdPlanTable> planTables = new ArrayList<>();

            for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
                AdPlanTable planTable = new AdPlanTable();
                //Map的第二種迴圈方式
                fieldValueMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                            planTable.setPlanId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                            planTable.setUserId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                            planTable.setPlanStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                            planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                            planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                    }
                });
                planTables.add(planTable);
            }

            //投遞推廣計劃
            planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
        } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
            List<AdCreativeTable> creativeTables = new LinkedList<>();

            rowData.getFieldValueMap().forEach(afterMap -> {
                AdCreativeTable creativeTable = new AdCreativeTable();
                afterMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                            creativeTable.setAdId(Long.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                            creativeTable.setType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                            creativeTable.setMaterialType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                            creativeTable.setHeight(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                            creativeTable.setWidth(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                            creativeTable.setAuditStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                            creativeTable.setAdUrl(v);
                            break;
                    }
                });
                creativeTables.add(creativeTable);
            });

            //投遞廣告創意
            creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
        }
    }

    private void Level3RowData(MysqlRowData rowData) {
       ...
    }

    /**
     * 處理4級廣告
     */
    private void Level4RowData(MysqlRowData rowData) {
        ...
    }
}
投放增量數據到MQ(kafka)

為了我們的數據投放更加靈活,方便數據統計,分析等系統的需求,我們來實現一個投放到消息中的介面,其他服務可以訂閱當前MQ 的TOPIC來實現數據訂閱。

配置文件中配置TOPIC
adconf:
  kafka:
    topic: ad-search-mysql-data

--------------------------------------
/**
 * KafkaSender for 投遞Binlog增量數據到kafka消息隊列
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 * @since 2019/7/1
 */
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {

    @Value("${adconf.kafka.topic}")
    private String topic;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 發送數據到kafka隊列
     */
    @Override
    public void sender(MysqlRowData rowData) {
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }

    /**
     * 測試消費kafka消息
     */
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMsg = Optional.ofNullable(record.value());
        if (kafkaMsg.isPresent()) {
            Object message = kafkaMsg.get();
            MysqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MysqlRowData.class
            );
            System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
            //sender.sender();
        }

    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在HBuilderX生成的文檔中,還有一個“manifest.json”,只要是創建“移動App”應用,都會在工程下生成這個文件,一看擴展名就知道他是一個json格式文件,文件文件根據w3c的webapp規範制定,是5+移動App的配置文件,用於指定應用的顯示名稱、圖標、應用入口文件地址及需要使用的 ...
  • 恢復內容開始 1.日曆組件 1.分析功能:日曆基本功能,點擊事件改變日期,樣式的改變 1.結構分析:html 1.分為上下兩個部分 2.上面分為左按鈕,中間內容展示,右按鈕 下麵分為周幾展示和日期展示 3.基本結構頁面html書寫 4.一些事件以及邏輯 1.使得當前的日期為今天的日期 2.設置該月日 ...
  • 今天我們來看一下單件模式,這個模式是所有模式中類圖最簡單的哦! 為什麼用單件模式: 有些對象我們只需要一個,比如:連接池、緩存、對話框、和註冊表對象、日誌對 象等對象。事實上,這類對象只能有一個實例,如果製造出多個實例,就會導致許 多問題產生,例如:程式的行為異常、資源使用過量,或者是不一致的結果。 ...
  • Spring 自定義了繼承 JDK 事件監聽器的介面 ,用來監聽 Spring 應用程式中的事件;自定義了繼承 JDK 事件對象的抽象類 ,用來表示 Spring 應用程式中的事件類型。容器啟動刷新過程中,可以定義事件監聽器,來監聽應用程式上下文事件 ,它有四個具體的實現類: ,分別表示容器啟動/刷 ...
  • 變數 什麼是變數? 變數,是用於在記憶體中存放程式數據的容器 電腦的最核心功能就是“計算”, 計算需要數據源,數據源要存在記憶體里,比如我要把小明的姓名、身高、年齡信息存下來,後面程式會調用。 怎樣定義一個變數? 直接設置一個“變數名=值” 怎樣調用變數? 後面程式想調用的時候,直接調 變數名 就可以 ...
  • 寫兩個線程,其中一個線程列印1-52,另一個線程列印A-Z,列印順序應該是12A34B56C......5152Z。 該習題需要用到多線程通信的知識。 思路分析: 把列印數字的線程稱為線程N,列印字母的線程稱為線程L. 1.線程N完成列印後,需要等待,通知線程L列印;同理,線程L列印後,也需要等待, ...
  • 9.1反射 在Go語言標準庫中reflect包提供了運行時反射,程式運行過程中動態操作結構體 當變數存儲結構體屬性名稱,想要對結構體這個屬性賦值或查看時,就可以使用反射 反射還可以用作判斷變數類型 整個reflect包中最重要的兩個類型 reflect.Type類型 reflect.Value值 獲 ...
  • 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,瞭解到Java 8里已經提供了一個非同步非阻塞的介面(CompletableFuture),可以實現簡單的響應式編程的模式,因此用這篇文章做個梳理。我是帶著下麵這幾個問題去學習CompletableFuture這個介面的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...