[Spring cloud 一步步實現廣告系統] 15. 使用開源組件監聽Binlog 實現增量索引準備

来源:https://www.cnblogs.com/zhangpan1244/archive/2019/08/09/11329817.html
-Advertisement-
Play Games

MySQL Binlog簡介 什麼是binlog? 一個二進位日誌,用來記錄對數據發生或潛在發生更改的SQL語句,並以而進行的形式保存在磁碟中。 binlog 的作用? 最主要有3個用途: 數據複製(主從同步) Mysql 的Master Slave協議,讓Slave可以通過監聽binlog實現數據 ...


MySQL Binlog簡介
  • 什麼是binlog?

一個二進位日誌,用來記錄對數據發生或潛在發生更改的SQL語句,並以而進行的形式保存在磁碟中。

  • binlog 的作用?

最主要有3個用途:

  • 數據複製(主從同步)
    Mysql 的Master-Slave協議,讓Slave可以通過監聽binlog實現數據複製,達到數據一致性目的
  • 數據恢復
    通過mysqlbinlog工具恢複數據
  • 增量備份
  • Binlog 變數
    • log_bin (Binlog 開關,使用show variables like 'log_bin';查看)
    • binlog_format (Binlog 日誌格式,使用show variables like 'binlog_format';查看)
      日誌格式總共有三種:
      • ROW, 僅保存記錄被修改的細節,不記錄SQL語句上下文相關信息。(能清晰的記錄下每行數據的修改細節,不需要記錄上下文相關信息,因此不會發生某些特定情況下的procedure、function以及trigger 的調用無法被準確複製的問題,任何情況下都可以被覆制,且能加快從庫重放日誌的效率,保證從庫數據的一致性)
      • STATEMENT,每一條修改數據的SQL都會被記錄。(只記錄執行語句的細節和上下文環境,避免了記錄每一行的變化,在一些修改記錄較多的情況下,相比ROW類型能大大減少binlog的日誌量,節約IO,提高性能。還可以用於實時的還原,同時主從版本可以不一樣,從伺服器版本可以比主伺服器版本高)
      • MIXED, 上述2種的混合使用
  • Binlog 管理
    • show master logs; 查看所有binlog的日誌列表
    • show master status; 查看最後一個binlog日誌編號名稱,以及最後一個事件技術的位置(position)
    • Flush logs; 刷新binlog,此刻開始產生一個新編號的binlog日誌文件
    • reset master; 清空所有的binlog日誌
  • Binlog 相關SQL show binlog events[in 'log_name'][from position][limit [offset,]row_count]
    UTOOLS1565224352749.png
    UTOOLS1565224799877.png
  • 常用的Binlog event
    • QUERY - 與數據無關的操作,begin、drop table、truncate table等等
    • TABLE_MAP - 記錄下一個操作所對應的表信息,存儲了資料庫名稱和表名稱
    • XID - 標記事務提交
    • WRITE_ROWS 插入數據,即insert操作
    • UPDATE_ROWS 更新數據,即update操作
    • DELETE_ROWS 刪除數據,即delete操作

Event包含header和data兩部分,header提供了event的創建時間,哪個伺服器等信息,data部分提供的是針對該event的具體信息,如具體數據的修改。
Tip: binlog不會記錄數據表的列名
在接下來的實現中,我們會將自己的系統包裝成一個假的Mysql Slave,通過開源工具mysql-binlog-connector-java來實現監聽binlog。

開源工具mysql-binlog-connector-java
<!-- binlog 日誌監聽,解析開源工具類庫 -->
<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.18.1</version>
</dependency>

2.創建一個測試介面

package com.sxzhongf.ad.service;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;

import java.io.IOException;

/**
 * BinlogServiceTest for 測試Mysql binlog 監控
 * {@code
 * Mysql8 連接提示 Client does not support authentication protocol requested by server; consider upgrading MySQL client 解決方法
 * USE mysql;
 * ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
 * FLUSH PRIVILEGES;
 * }
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
public class BinlogServiceTest {

    /**
     * --------Update-----------
     * UpdateRowsEventData{tableId=90, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
     *     {before=[11, 10, Test Bin Log, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019], after=[11, 10, zhangpan test Binlog, 1, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019, Tue Jun 25 08:00:00 CST 2019]}
     * ]}
     *
     * --------Insert-----------
     * WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
     *     [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
     * ]}
     */

    public static void main(String[] args) throws IOException {

//        //構造BinaryLogClient,填充mysql鏈接信息
        BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
                "root", "12345678"
        );

        //設置需要讀取的Binlog的文件以及位置,否則,client會從"頭"開始讀取Binlog並監聽
//        client.setBinlogFilename("binlog.000035");
//        client.setBinlogPosition();

        //給客戶端註冊監聽器,實現對Binlog的監聽和解析
        //event 就是監聽到的Binlog變化信息,event包含header & data 兩部分
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof UpdateRowsEventData) {
                System.out.println("--------Update-----------");
                System.out.println(data.toString());
            } else if (data instanceof WriteRowsEventData) {
                System.out.println("--------Insert-----------");
                System.out.println(data.toString());
            } else if (data instanceof DeleteRowsEventData) {
                System.out.println("--------Delete-----------");
                System.out.println(data.toString());
            }
        });

        client.connect();
    }
}

運行:

八月 08, 2019 9:13:32 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 127.0.0.1:3306 at binlog.000038/951 (sid:65535, cid:336)
...

執行sql update ad_user set user_status=1 where user_id=10;

UTOOLS1565227012106.png

我們需要知道的是,我們的目的是實現對Mysql數據表的變更實現監聽,並解析成我們想要的格式,也就是我們的java對象。根據上面我們看到的監聽結果,我們知道了返回信息的大概內容,既然我們已經學會了簡單的使用BinaryLogClient 來監聽binlog,接下來,我們需要定義一個監聽器,來實現我們自己的業務內容。

因為我們只需要Event中的內容,那麼我們也就只需要通過實現com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener介面,來自定義一個監聽器實現我們的業務即可。通過Event的內容,來判定是否需要處理當前event以及如何處理。

構造解析binlog的模版文件

我們監聽binlog來構造增量數據的根本原因,是為了將我們的廣告投放系統廣告檢索系統 業務解耦,由於我們的檢索系統中沒有定義資料庫以及數據表的相關,所以,我們通過定義一份模版文件,通過解析模版文件來得到我們需要的資料庫和表信息,因為binlog的監聽是不區分是哪個資料庫和哪個數據表信息的,我們可以通過模版來指定我們想要監聽的部分。

{
  "database": "advertisement",
  "tableList": [
    {
      "tableName": "ad_plan",
      "level": 2,
      "insert": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "update": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "delete": [
        {
          "column": "plan_id"
        }
      ]
    },
    {
      "tableName": "ad_unit",
      "level": 3,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "update": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "delete": [
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_creative",
      "level": 2,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "update": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "delete": [
        {
          "column": "creative_id"
        }
      ]
    },
    {
      "tableName": "relationship_creative_unit",
      "level": 3,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_unit_district",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ]
    },
    {
      "tableName": "ad_unit_hobby",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ]
    },
    {
      "tableName": "ad_unit_keyword",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ]
    }
  ]
}

上面的模版文件中,指定了一個資料庫為advertisement,大家可以方便添加多個監聽庫。在資料庫下麵,我們監聽了幾個表的CUD操作以及每個操作所需要的欄位信息。

  • 實現模版 —> Java Entity
    • 定義模版文件對應的實體
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class BinlogTemplate {
          //單資料庫對應
        private String database;
          //多表
        private List<JsonTable> tableList;
    }
    • 對應的json 中 table信息
    /**
     * JsonTable for 用於表示template.json中對應的表信息
     *
     * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class JsonTable {
        private String tableName;
        private Integer level;
    
        private List<Column> insert;
        private List<Column> update;
        private List<Column> delete;
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Column {
            private String columnName;
        }
    }
    • 讀取的對應表信息對象(最主要目的就是為了能將欄位索引 映射到 欄位名稱
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class TableTemplate {
        private String tableName;
        private String level;
    
          //操作類型 -> 多列
        private Map<OperationTypeEnum, List<String>> opTypeFieldSetMap = new HashMap<>();
    
        /**
         * Binlog日誌中 欄位索引 -> 欄位名稱 的一個轉換映射
         * 因為binlog中不會顯示更新的列名是什麼,它只會展示欄位的索引,因此我們需要實現一次轉換
         */
        private Map<Integer, String> posMap = new HashMap<>();
    }
    • 解析模版文件到java對象
    @Data
    public class ParseCustomTemplate {
    
        private String database;
    
        /**
         * key -> TableName
         * value -> {@link TableTemplate}
         */
        private Map<String, TableTemplate> tableTemplateMap;
    
        public static ParseCustomTemplate parse(BinlogTemplate _template) {
            ParseCustomTemplate template = new ParseCustomTemplate();
            template.setDatabase(_template.getDatabase());
    
            for (JsonTable jsonTable : _template.getTableList()) {
                String name = jsonTable.getTableName();
                Integer level = jsonTable.getLevel();
    
                TableTemplate tableTemplate = new TableTemplate();
                tableTemplate.setTableName(name);
                tableTemplate.setLevel(level.toString());
                template.tableTemplateMap.put(name, tableTemplate);
    
                //遍歷操作類型對應的列信息
                Map<OperationTypeEnum, List<String>> operationTypeListMap = tableTemplate.getOpTypeFieldSetMap();
    
                for (JsonTable.Column column : jsonTable.getInsert()) {
                    getAndCreateIfNeed(
                            OperationTypeEnum.ADD,
                            operationTypeListMap,
                            ArrayList::new
                    ).add(column.getColumnName());
                }
    
                for (JsonTable.Column column : jsonTable.getUpdate()) {
                    getAndCreateIfNeed(
                            OperationTypeEnum.UPDATE,
                            operationTypeListMap,
                            ArrayList::new
                    ).add(column.getColumnName());
                }
    
                for (JsonTable.Column column : jsonTable.getDelete()) {
                    getAndCreateIfNeed(
                            OperationTypeEnum.DELETE,
                            operationTypeListMap,
                            ArrayList::new
                    ).add(column.getColumnName());
                }
            }
    
            return template;
        }
    
        /**
         * 從Map中獲取對象,如果不存在,創建一個
         */
        private static <T, R> R getAndCreateIfNeed(T key, Map<T, R> map, Supplier<R> factory) {
            return map.computeIfAbsent(key, k -> factory.get());
        }
    }
    • 解析 欄位索引 -> 欄位名稱 的一個轉換映射

    首先,我們來看一下binlog的具體日誌信息:

    --------Insert-----------
    WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
    --------Update-----------
    UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
        {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
    

    可以看到,在日誌中includedColumns只包含了{0, 1, 2, 3, 4, 5}位置信息,那麼我們怎麼能知道它具體代表的是哪個欄位呢,接下來我們來實現這步映射關係,在實現之前,我們先來查詢一下資料庫中我們的表中欄位所處的具體位置:

    sql> SELECT table_schema,table_name,column_name,ordinal_position FROM information_schema.COLUMNS
    WHERE TABLE_SCHEMA = 'advertisement' AND TABLE_NAME='ad_user'

    UTOOLS1565311337937.png

    我們可以看到ordinal_position對應的是1-6,可是上面監聽到的binlog日誌索引是0-5,所以我們就可以看出來之間的對應關係。

    我們開始編碼實現,我們使用JdbcTemplate進行查詢資料庫信息:

    @Slf4j
    @Component
    public class TemplateHolder {
        private ParseCustomTemplate template;
    
        private final JdbcTemplate jdbcTemplate;
    
        private String SQL_SCHEMA = "SELECT TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION FROM information_schema.COLUMNS " +
                "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";
    
        @Autowired
        public TemplateHolder(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        /**
         * 需要在容器載入的時候,就載入數據信息
         */
        @PostConstruct
        private void init() {
            loadJSON("template.json");
        }
    
        /**
         * 對外提供載入服務
         */
        public TableTemplate getTable(String tableName) {
            return template.getTableTemplateMap().get(tableName);
        }
    
        /**
         * 載入需要監聽的binlog json文件
         */
        private void loadJSON(String path) {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            InputStream inputStream = classLoader.getResourceAsStream(path);
    
            try {
                BinlogTemplate binlogTemplate = JSON.parseObject(
                        inputStream,
                        Charset.defaultCharset(),
                        BinlogTemplate.class
                );
    
                this.template = ParseCustomTemplate.parse(binlogTemplate);
                loadMeta();
            } catch (IOException ex) {
                log.error((ex.getMessage()));
                throw new RuntimeException("fail to parse json file");
            }
        }
    
        /**
         * 載入元信息
         * 使用表索引到列名稱的映射關係
         */
        private void loadMeta() {
            for (Map.Entry<String, TableTemplate> entry : template.getTableTemplateMap().entrySet()) {
                TableTemplate table = entry.getValue();
    
                List<String> updateFields = table.getOpTypeFieldSetMap().get(
                        OperationTypeEnum.UPDATE
                );
                List<String> insertFields = table.getOpTypeFieldSetMap().get(
                        OperationTypeEnum.ADD
                );
                List<String> deleteFields = table.getOpTypeFieldSetMap().get(
                        OperationTypeEnum.DELETE
                );
    
                jdbcTemplate.query(SQL_SCHEMA, new Object[]{
                                template.getDatabase(), table.getTableName()
                        }, (rs, i) -> {
                            int pos = rs.getInt("ORDINAL_POSITION");
                            String colName = rs.getString("COLUMN_NAME");
    
                            if ((null != updateFields && updateFields.contains(colName))
                                || (null != insertFields && insertFields.contains(colName))
                                || (null != deleteFields && deleteFields.contains(colName))) {
                                         table.getPosMap().put(pos - 1, colName);
                            }
                            return null;
                        }
                );
            }
        }
    }
    • 監聽binlog實現
      • 定義Event 解析所需要轉換的java對象
      @Data
      public class BinlogRowData {
      
          private TableTemplate tableTemplate;
      
          private EventType eventType;
      
          private List<Map<String, String>> before;
      
          private List<Map<String, String>> after;
      
      }
      • 定義binlog client BinaryLogClient
      /**
       * CustomBinlogClient for 自定義Binlog Client
       *
       * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
       * @since 2019/6/27
       */
      @Slf4j
      @Component
      public class CustomBinlogClient {
      
          private BinaryLogClient client;
      
          private final BinlogConfig config;
          private final AggregationListener listener;
      
          @Autowired
          public CustomBinlogClient(BinlogConfig config, AggregationListener listener) {
              this.config = config;
              this.listener = listener;
          }
      
          public void connect() {
              new Thread(() -> {
                  client = new BinaryLogClient(
                          config.getHost(),
                          config.getPort(),
                          config.getUsername(),
                          config.getPassword()
                  );
      
                  if (!StringUtils.isEmpty(config.getBinlogName()) && !config.getPosition().equals(-1L)) {
                      client.setBinlogFilename(config.getBinlogName());
                      client.setBinlogPosition(config.getPosition());
                  }
      
                  try {
                      log.info("connecting to mysql start...");
                      client.connect();
                      log.info("connecting to mysql done!");
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }).start();
          }
      
          public void disconnect() {
              try {
                  log.info("disconnect to mysql start...");
                  client.disconnect();
                  log.info("disconnect to mysql done!");
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      }
      • 使用client註冊事件監聽器com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener
      /**
       * Ilistener for 為了後續擴展不同的實現
       *
       * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
       */
      public interface Ilistener {
      
          void register();
      
          void onEvent(BinlogRowData eventData);
      }
      • 監聽Binlog, 收集mysql binlog datas
      @Slf4j
      @Component
      public class AggregationListener implements BinaryLogClient.EventListener {
      
          private String dbName;
          private String tbName;
      
          private Map<String, Ilistener> listenerMap = new HashMap<>();
      
          @Autowired
          private TemplateHolder templateHolder;
      
          private String genKey(String dbName, String tbName) {
              return dbName + ":" + tbName;
          }
      
          /**
           * 根據表實現註冊信息
           */
          public void register(String dbName, String tbName, Ilistener listener) {
              log.info("register : {}-{}", dbName, tbName);
              this.listenerMap.put(genKey(dbName, tbName), listener);
          }
      
          @Override
          public void onEvent(Event event) {
      
              EventType type = event.getHeader().getEventType();
              log.info("Event type: {}", type);
      
              //資料庫增刪改之前,肯定有一個table_map event 的binlog
              if (type == EventType.TABLE_MAP) {
                  TableMapEventData data = event.getData();
                  this.tbName = data.getTable();
                  this.dbName = data.getDatabase();
                  return;
              }
      
              //EXT_UPDATE_ROWS 是Mysql 8以上的type
              if (type != EventType.EXT_UPDATE_ROWS
                      && type != EventType.EXT_WRITE_ROWS
                      && type != EventType.EXT_DELETE_ROWS
                      ) {
                  return;
              }
      
              // 檢查表名和資料庫名是否已經正確填充
              if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tbName)) {
                  log.error("Meta data got error. tablename:{},database:{}", tbName, dbName);
                  return;
              }
      
              //找出對應數據表敏感的監聽器
              String key = genKey(this.dbName, this.tbName);
              Ilistener ilistener = this.listenerMap.get(key);
              if (null == ilistener) {
                  log.debug("skip {}", key);
              }
      
              log.info("trigger event:{}", type.name());
      
              try {
                  BinlogRowData rowData = convertEventData2BinlogRowData(event.getData());
                  if (null == rowData) {
                      return;
                  }
                  rowData.setEventType(type);
                  ilistener.onEvent(rowData);
      
              } catch (Exception e) {
                  e.printStackTrace();
                  log.error(e.getMessage());
              } finally {
                  this.dbName = "";
                  this.tbName = "";
              }
          }
      
          /**
           * 解析Binlog數據到Java實體對象的映射
           *
           * @param data binlog
           * @return java 對象
           */
          private BinlogRowData convertEventData2BinlogRowData(EventData data) {
              TableTemplate tableTemplate = templateHolder.getTable(tbName);
              if (null == tableTemplate) {
                  log.warn("table {} not found.", tbName);
                  return null;
              }
      
              List<Map<String, String>> afterMapList = new ArrayList<>();
      
              for (Serializable[] after : getAfterValues(data)) {
                  Map<String, String> afterMap = new HashMap<>();
      
                  int columnLength = after.length;
                  for (int i = 0; i < columnLength; ++i) {
                      //取出當前位置對應的列名
                      String colName = tableTemplate.getPosMap().get(i);
                      //如果沒有,則說明不需要該列
                      if (null == colName) {
                          log.debug("ignore position: {}", i);
                          continue;
                      }
      
                      String colValue = after[i].toString();
                      afterMap.put(colName, colValue);
                  }
      
                  afterMapList.add(afterMap);
              }
      
              BinlogRowData binlogRowData = new BinlogRowData();
              binlogRowData.setAfter(afterMapList);
              binlogRowData.setTableTemplate(tableTemplate);
      
              return binlogRowData;
          }
      
          /**
           * 獲取不同事件的變更後數據
           * Add & Delete變更前數據假定為空
           */
          private List<Serializable[]> getAfterValues(EventData eventData) {
      
              if (eventData instanceof WriteRowsEventData) {
                  return ((WriteRowsEventData) eventData).getRows();
              }
      
              if (eventData instanceof UpdateRowsEventData) {
                  return ((UpdateRowsEventData) eventData).getRows()
                                                          .stream()
                                                          .map(Map.Entry::getValue)
                                                          .collect(Collectors.toList()
                                                          );
              }
      
              if (eventData instanceof DeleteRowsEventData) {
                  return ((DeleteRowsEventData) eventData).getRows();
              }
      
              return Collections.emptyList();
          }
      }
      • 解析binlog 數據對象BinlogRowData ,用於增量索引的後續處理
      /**
       * MysqlRowData for 簡化{@link BinlogRowData} 以方便實現增量索引的實現
       *
       * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
       */
      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class MysqlRowData {
      
          //實現多數據的時候,需要傳遞資料庫名稱
          //private String database;
          private String tableName;
          private String level;
          private OperationTypeEnum operationTypeEnum;
          private List<Map<String, String>> fieldValueMap = new ArrayList<>();
      }

      因為我們需要將Binlog EventType轉換為我們的操作類型OperationTypeEnum,所以,我們在OperationTypeEnum中添加一個轉換方法:

      public enum OperationTypeEnum {
      ...
          public static OperationTypeEnum convert(EventType type) {
              switch (type) {
                  case EXT_WRITE_ROWS:
                      return ADD;
                  case EXT_UPDATE_ROWS:
                      return UPDATE;
                  case EXT_DELETE_ROWS:
                      return DELETE;
                  default:
                      return OTHER;
              }
          }
      }

      我們還需要定義一個表包含的各個列名稱的java類,方便我們後期對數據表的CUD操作:

      package com.sxzhongf.ad.mysql.constant;
      
      import java.util.HashMap;
      import java.util.Map;
      
      /**
       * Constant for 各個列名稱的java類,方便我們後期對數據表的CUD操作
       *
       * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
       */
      public class Constant {
      
          private static final String DATABASE_NAME = "advertisement";
      
          public static class AD_PLAN_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_plan";
      
              public static final String COLUMN_PLAN_ID = "plan_id";
              public static final String COLUMN_USER_ID = "user_id";
              public static final String COLUMN_PLAN_STATUS = "plan_status";
              public static final String COLUMN_START_DATE = "start_date";
              public static final String COLUMN_END_DATE = "end_date";
          }
      
          public static class AD_CREATIVE_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_creative";
      
              public static final String COLUMN_CREATIVE_ID = "creative_id";
              public static final String COLUMN_TYPE = "type";
              public static final String COLUMN_MATERIAL_TYPE = "material_type";
              public static final String COLUMN_HEIGHT = "height";
              public static final String COLUMN_WIDTH = "width";
              public static final String COLUMN_AUDIT_STATUS = "audit_status";
              public static final String COLUMN_URL = "url";
          }
      
          public static class AD_UNIT_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_unit";
      
              public static final String COLUMN_UNIT_ID = "unit_id";
              public static final String COLUMN_UNIT_STATUS = "unit_status";
              public static final String COLUNN_POSITION_TYPE = "position_type";
              public static final String COLUNN_PLAN_ID = "plan_id";
          }
      
          public static class RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO {
      
              public static final String TABLE_NAME = "relationship_creative_unit";
      
              public static final String COLUMN_CREATIVE_ID = "creative_id";
              public static final String COLUMN_UNIT_ID = "unit_id";
          }
      
          public static class AD_UNIT_DISTRICT_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_unit_district";
      
              public static final String COLUMN_UNIT_ID = "unit_id";
              public static final String COLUMN_PROVINCE = "province";
              public static final String COLUMN_CITY = "city";
          }
      
          public static class AD_UNIT_KEYWORD_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_unit_keyword";
      
              public static final String COLUMN_UNIT_ID = "unit_id";
              public static final String COLUMN_KEYWORD = "keyword";
          }
      
          public static class AD_UNIT_HOBBY_TABLE_INFO {
      
              public static final String TABLE_NAME = "ad_unit_hobby";
      
              public static final String COLUMN_UNIT_ID = "unit_id";
              public static final String COLUMN_HOBBY_TAG = "hobby_tag";
          }
      
          //key -> 表名
          //value -> 資料庫名
          public static Map<String, String> table2db;
      
          static {
              table2db = new HashMap<>();
              table2db.put(AD_PLAN_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(AD_CREATIVE_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(AD_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(AD_UNIT_HOBBY_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
              table2db.put(AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
          }
      }
      

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 消息隊列在現今數據量超大,併發量超高的系統中是十分常用的。本文將會對現時最常用到的幾款消息隊列框架 ActiveMQ、RabbitMQ、Kafka 進行分析對比。 詳細介紹 RabbitMQ 在 Sprinig 框架下的結構及實現原理,從Producer 端的事務、回調函數(ConfirmCallb... ...
  • 今天我們來講一講抽象工廠: 重要涉及原則:要依賴抽象,不要依賴具體。 首先我們需要瞭解一個設計原則——依賴倒置原則:減少對具體的依賴,所謂的倒置是倒置的 僅僅是指的和一般OO設計的思考方式完 全相反(不能讓高層組件依賴底層組件, 而且,不管高層組件還是底層組件,“ 兩者”都應該依賴於抽象)。 你應該 ...
  • 一、註意點 1.大容量不能直接賦值給小容量;大容量轉化為小容量需要進行,強制類型轉換,強制類型轉換需要加上“強制類型轉換符”,加上強制類型轉換符之後編譯通過了但是精度會有有可能損失。所以強制類型轉換要謹慎使用。因為損失精度之後可能損失很嚴重。 例子: 底層原理:long是八個位元組,現在要轉為四個位元組 ...
  • == 和 equals的區別 ==是一個運算符 用於比較兩端的內容是否相等 基本數據類型 : 兩端的值是否相等 引用類型 : 比較的是引用的值(記憶體指向的地址)是否相等 equals() : 它是Object類的一個方法 子類繼承到這個方法之後 可以按照自己所需要的邏輯需求 覆蓋這個方法 從而描述自 ...
  • 一般部署項目到伺服器,會安裝uwsgi,但是很多教程在安裝它的時候會讓你測試一下安裝好了沒,於是就有很多像我一樣懵逼的少年掉進一個坑裡出不來,很久、很久... 那就是最後瀏覽器輸入ip:8000埠後伺服器有反應,但是瀏覽器一片空白 原因:因為測試用的代碼是對python2.x 環境測試的,他喵的現 ...
  • 1.到官網下載centOS對應版本的xampp,應該是以tar.gz為尾碼的 2.tar -zxf 下載的包 3.mv lampp /opt 4.service mysqld stop因xampp里自己帶一個mysql,會和之前裝的mysql埠衝突,故停掉單獨裝的mysql 5.關閉防火牆 ser ...
  • 前提: 當在程式測試時,如果你需要定義一個自己的異常,而非現在已經存在的異常,這個時候你需要用到throws和throw,try catch只是一個簡單的捕獲異常的過程。 代碼如下: 其實throw的使用很簡單 首先你必須要定義一個異常類,因為你要自己拋出一個以前沒有見過的異常 其次你要寫你定義的這 ...
  • 首先要搞懂本地操作系統編碼與系統編碼的區別: 1. 本地操作系統編碼方式與操作系統有關,Linux預設編碼方式為utf 8,Windows預設編碼方式為gbk; 2. 系統編碼方式與編譯器or解釋器有關,Python3解釋器預設編碼方式為unicode。 3. 編碼方式不僅僅代表編碼,也包括解碼,因 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...