Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka

来源:https://www.cnblogs.com/listenfwind/archive/2018/12/08/10089082.html
-Advertisement-
Play Games

一. 概述 在大數據的靜態數據處理中,目前普遍採用的是用 Spark + Hdfs (Hive / Hbase) 的技術架構來對數據進行處理。 但有時候有其他的需求,需要從其他不同數據源不間斷得採集數據,然後存儲到 Hdfs 中進行處理。而追加(append)這種操作在 Hdfs 裡面明顯是比較麻煩 ...


一. 概述

在大數據的靜態數據處理中,目前普遍採用的是用 Spark + Hdfs (Hive / Hbase) 的技術架構來對數據進行處理。

但有時候有其他的需求,需要從其他不同數據源不間斷得採集數據,然後存儲到 Hdfs 中進行處理。而追加(append)這種操作在 Hdfs 裡面明顯是比較麻煩的一件事。所幸有了 Storm 這麼個流數據處理這樣的東西問世,可以幫我們解決這些問題。

不過光有 Storm 還不夠,我們還需要其他中間件來協助我們,讓所有其他數據源都歸於一個通道。這樣就能實現不同數據源以及 Hhdfs 之間的解耦。而這個中間件 Kafka 無疑是一個很好的選擇。

這樣我們就可以讓 Mysql 的增量數據不停得拋出到 Kafka ,而後再讓 storm 不停得從 Kafka 對應的 Topic 讀取數據並寫入到 Hdfs 中。

二. 基本知識

2.1 Mysql binlog 介紹

binlog 即 Mysql 的二進位日誌。它可以說是 Mysql 最重要的日誌了,它記錄了所有的DDL和DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位日誌是事務安全型的。

上面所說的提到了 DDL 和 DML ,可能有些同學不瞭解,這裡順便說一下:

  • DDL:數據定義語言DDL用來創建資料庫中的各種對象-----表、視圖、索引、同義詞、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
  • DML:數據操縱語言DML主要有三種形式:插入(INSERT), 更新(UPDATE),以及刪除(DELETE)。

在 Mysql 中,binlog 預設是不開啟的,因為有大約 1% (官方說法)的性能損耗,如果要手動開啟,流程如下:

  1. vi編輯打開mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld] 區塊設置/添加如下,

log-bin=mysql-bin 

註意一定要在 [mysqld] 下。

  1. 重啟 Mysql
pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2 kafka

這裡只對 Kafka 做一個基本的介紹,更多的內容可以度娘一波。

上面的圖片是 kafka 官方的一個圖片,我們目前只需要關註 Producers 和 Consumers 就行了。

Kafka 是一個分散式發佈-訂閱消息系統。分散式方面由 Zookeeper 進行協同處理。消息訂閱其實說白了吧,就是一個隊列,分為消費者和生產者,就像上圖中的內容,有數據源充當 Producer 生產數據到 kafka 中,而有數據充當 Consumers ,消費 kafka 中的數據。

上圖中的 offset 指的是數據的寫入以及消費的位置的信息,這是由 Zookeeper 管理的。也就是說,當 Consumers 重啟或是怎樣,需要重新從 kafka 讀取消息時,總不能讓它從頭開始消費數據吧,這時候就需要有個記錄能告訴你從哪裡開始重新讀取。這就是 offset 。

kafka 中還有一個至關重要的概念,那就是 topic 。不過這個其實還是很好理解的,比如你要訂閱一些消息,你肯定是不會訂閱所有消息的吧,你只需要訂閱你感興趣的主題,比如攝影,編程,搞笑這些主題。而這裡主題的概念其實和 topic 是一樣的。總之,可以將 topic 歸結為通道,kafka 中有很多個通道,不同的 Producer 向其中一個通道生產數據,也就是拋數據進去這個通道,Comsumers 不停得消費通道中的數據。

而我們要做的就是將 Mysql binlog 產生的數據拋到 kafka 中充當作生產者,然後由 storm 充當消費者,不停得消費數據並寫入到 Hdfs 中。

至於怎麼將 binlog 的數據拋到 kafka ,別急,下麵我們就來介紹。

2.3 maxwell

maxwell 這個工具可以很方便得監聽 Mysql 的 binlog ,然後每當 binlog 發生變化時,就會以 json 格式拋出對應的變化數據到 Kafka 中。比如當向 mysql 一張表中插入一條語句的時候,maxwell 就會立刻監聽到 binlog 中有對應的記錄增加,然後將一些信息包括插入的數據都轉化成 json 格式,然後拋到 kafka 指定的 topic 中。

下載地址在這裡可以找到。

除了 Kafka 外,其實 maxwell 還支持寫入到其他各種中間件,比如 redis。
同時 maxwell 是比較輕量級的工具,只需要在 mysql 中新建一個資料庫供它記錄一些信息,然後就可以直接運行。

三. 使用 maxwell 監聽 binlog

接下來我們將的是如果使用 maxwell ,讓它監聽 mysql 的 binlog 並拋到 kafka 中。maxwell 主要有兩種運行方式。一種是使用配置文件,另一種則是在命令行中添加參數的方式運行。這裡追求方便,只使用命令行的方式進行演示。

這裡介紹一下簡單的將數據拋到 kafka 的命令行腳本吧。

bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各項參數說明如下

  • user:mysql 用戶名
  • password:mysql 密碼
  • host:Mysql 地址
  • producer:指定寫入的中間件類型,比如還有 redies
  • kafka.bootstrap.servers:kafka 的地址
  • kafka_topic:指明寫入到 kafka 哪個 topic
  • port:mysql 埠

啟動之後,maxwell 便開始工作了,當然如果你想要讓這條命令可以在後臺運行的話,可以使用 Linux 的 nohup 命令,這裡就不多贅述,有需要百度即可。

這樣配置的話通常會將整個資料庫的增刪改都給拋到 kafka ,但這樣的需求顯然不常見,更常見的應該是具體監聽對某個庫的操作,或是某個表的操作。

在升級到 1.9.2(最新版本)後,maxwell 為我們提供這樣一個參數,讓我們可以輕鬆實現上述需求:--filter

這個參數通常包含兩個配置項,exclude 和 include。意思就是讓你指定排除哪些和包含哪些。比如我只想監聽 Adatabase 庫下的 Atable 表的變化。我可以這樣。

--filter='exclude: *.*, include: Adatabase.Atable'

這樣我們就可以輕鬆實現監聽 mysql binlog 的變化,並可以定製自己的需求。

OK,這一章我們介紹了 mysql binlog ,kafka 以及 maxwell 的一些內容,下一篇我們將會看到 storm 如何寫入 hdfs 以及定製一些策略。see you~~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1 今日內容(分頁機制初始化) 在初始化記憶體的結點和記憶體區域之前, 內核先通過pagging_init初始化了內核的分頁機制. 在分頁機制完成後, 才會開始初始化系統的記憶體數據結構(包括記憶體節點數據和記憶體區域), 併在隨後初始化buddy伙伴系統來接管記憶體管理的工作 2 分頁機制初始化 arm64架 ...
  • 1. 今日內容(第二階段(二)–初始化備用記憶體域列表zonelists) 我們之前講了在memblock完成之後, 記憶體初始化開始進入第二階段, 第二階段是一個漫長的過程, 它執行了一系列複雜的操作, 從體繫結構相關信息的初始化慢慢向上層展開, 其主要執行瞭如下操作 特定於體繫結構的設置 在完成了基 ...
  • 話不多說直接看步驟 * 系統版本: ubuntu 18.04.1 // 查看命令為: # cat /etc/issue 1. 先查看當前系統是否存在 fcitx 框架; # dpkg -l | grep fcitx 沒有的話 直接安裝 # sudo apt-get install fcitx 2. ...
  • Windows下Oracle 11g的安裝 Windows下Oracle 11g的安裝: Windows:64位, Oracle 11g版本:win64_11gR2_database_1of2(安裝包),win64_11gR2_database_2of2(輔助包) 註:先將win64_11gR2_d ...
  • scp是 secure copy的縮寫, scp是linux系統下基於ssh登陸進行安全的遠程文件拷貝命令。linux的scp命令可以在linux伺服器之間複製文件和目錄。 使用語法:scp [參數] [源路徑] @IP:/目標路徑 scp 參數如下: -1: 強制scp命令使用協議ssh1 -2: ...
  • 我們在導入sklearn時往往會報錯。 折騰了一下午之後的解決方案:1.shadowsocks翻牆,解決pip官網下載速度問題。2.用pip在anaconda Scripts目錄中卸載numpy,scipy,matplotlib,skicit-learn相關。 命令:pip uninstall nu ...
  • Linux系統中的 iostat是I/O statistics(輸入/輸出統計)的縮寫,iostat工具將對系統的磁碟操作活動進行監視。它的特點是彙報磁碟活動統計情況,同時也會彙報出CPU使用情況。同vmstat一樣,iostat也有一個弱點,就是它不能對某個進程進行深入分析,僅對系統的整體情況進行 ...
  • Oracle資料庫中的統計信息是這樣一組數據:它存儲在數據字典中,且從多個維度描述了Oracle資料庫里對象的詳細信息。 CBO會利用這些統計信息來計算目標SQL各種可能的,不同的執行路徑的成本,從中選擇一條成本最小的執行路徑來作為目標SQL的執行計劃。 統計信息分為以下六種: 表的統計信息 索引的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...