Mysql 流增量寫入 Hdfs(二) --Storm + hdfs 的流式處理

来源:https://www.cnblogs.com/listenfwind/archive/2018/12/12/10111033.html
-Advertisement-
Play Games

一. 概述 上一篇我們介紹瞭如何將數據從 mysql 拋到 kafka,這次我們就專註於利用 storm 將數據寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 數據充當數據源,下章再進行整合。這裡預設你是擁有 ...


一. 概述

上一篇我們介紹瞭如何將數據從 mysql 拋到 kafka,這次我們就專註於利用 storm 將數據寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 數據充當數據源,下章再進行整合。這裡預設你是擁有一定的 storm 知識的基礎,起碼知道 Spout 和 bolt 是什麼。

寫入 hdfs 可以有以下的定製策略:

  1. 自定義寫入文件的名字
  2. 定義寫入內容格式
  3. 滿足給定條件後更改寫入的文件
  4. 更改寫入文件時觸發的 Action

本篇會先說明如何用 storm 寫入 HDFS,寫入過程一些 API 的描述,以及最後給定一個例子:

storm 每接收到 10 個 Tuple 後就會改變 hdfs 寫入文件,新文件的名字就是第幾次改變。

ps:storm 版本:1.1.1 。Hadoop 版本:2.7.4 。

接下來我們首先看看 Storm 如何寫入 HDFS 。

二. Storm 寫入 HDFS

Storm 官方有提供了相應的 API 讓我們可以使用。可以通過創建 HdfsBolt 以及定義相應的規則,即可寫入 HDFS 。

首先通過 maven 配置依賴以及插件。


    <properties>
        <storm.version>1.1.1</storm.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>

        <!--hadoop模塊-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.1.1</version>
            <!--<scope>test</scope>-->
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
                </configuration>
            </plugin>
   
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

這裡要提一下,如果要打包部署到集群上的話,打包的插件需要使用 maven-shade-plugin 這個插件,然後使用 maven Lifecycle 中的 package 打包。而不是用 Maven-assembly-plugin 插件進行打包。

因為使用 Maven-assembly-plugin 的時候,會將所有依賴的包unpack,然後在pack,這樣就會出現,同樣的文件被覆蓋的情況。發佈到集群上的時候就會報 No FileSystem for scheme: hdfs 的錯 。

然後是使用 HdfsBolt 寫入 Hdfs。這裡來看看官方文檔中的例子吧。

// 使用 "|" 來替代 ",",來進行字元分割
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// 每輸入 1k 後將內容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// 當文件大小達到 5MB ,轉換寫入文件,即寫入到一個新的文件中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

//當轉換寫入文件時,生成新文件的名字並使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:9000")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

//生成該 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
        

到這裡就結束了。可以將 HdfsBolt 當作一個 Storm 中特殊一些的 bolt 即可。這個 bolt 的作用即使根據接收信息寫入 Hdfs。

而在新建 HdfsBolt 中,Storm 為我們提供了相當強的靈活性,我們可以定義一些策略,比如當達成某個條件的時候轉換寫入文件,新寫入文件的名字,寫入時候的分隔符等等。

如果選擇使用的話,Storm 有提供部分介面供我們使用,但如果我們覺得不夠豐富也可以自定義相應的類。下麵我們看看如何控制這些策略吧。

RecordFormat

這是一個介面,允許你自由定義接收到內容的格式。

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}

Storm 提供了 DelimitedRecordFormat ,使用方法在上面已經有了。這個類預設的分割符是逗號",",而你可以通過 withFieldDelimiter 方法改變分隔符。
如果你的初始分隔符不是逗號的話,那麼也可以重寫寫一個類實現 RecordFormat 介面即可。

FileNameFormat

同樣是一個介面。

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}

Storm 所提供的預設的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 。預設人使用的轉換文件名有點長,格式是這樣的:

{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

例如:

MyBolt-5-7-1390579837830.txt

預設情況下,首碼是空的,擴展標識是".txt"。

SyncPolicy

同步策略允許你將 buffered data 緩衝到 Hdfs 文件中(從而client可以讀取數據),通過實現org.apache.storm.hdfs.sync.SyncPolicy 介面:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

FileRotationPolicy

這個介面允許你控制什麼情況下轉換寫入文件。

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

Storm 有提供三個實現該介面的類:

  • 最簡單的就是不進行轉換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什麼也不幹。

  • 通過文件大小觸發轉換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。

  • 通過時間條件來觸發轉換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。

如果有更加複雜的需求也可以自己定義。

RotationAction

這個主要是提供一個或多個 hook ,可加可不加。主要是在觸發寫入文件轉換的時候會啟動。

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}

三.實現一個例子

瞭解了上面的情況後,我們會實現一個例子,根據寫入記錄的多少來控制寫入轉換(改變寫入的文件),並且轉換後文件的名字表示當前是第幾次轉換。

首先來看看 HdfsBolt 的內容:

        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
        /** rotate file with Date,every month create a new file
         * format:yyyymm.txt
         */
        FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
        FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
        RotationAction action = new NewFileAction();
        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl("hdfs://127.0.0.1:9000")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .addRotationAction(action);

然後分別來看各個策略的類。

FileRotationPolicy

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 計數以改變Hdfs寫入文件的位置,當寫入10次的時候,則更改寫入文件,更改名字取決於 “TimesFileNameFormat”
 * 這個類是線程安全
 */

public class CountStrRotationPolicy implements FileRotationPolicy {


    private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");

    private String date =  null;

    private int count = 0;

    public CountStrRotationPolicy(){
        this.date =  df.format(new Date());
//        this.date = df.format(new Date());
    }


    /**
     * Called for every tuple the HdfsBolt executes.
     *
     * @param tuple  The tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
     */
    @Override
    public boolean mark(Tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            System.out.print("num :" +count + "   ");
            count = 0;
            return true;

        }
        else {
            return false;
        }
    }

    /**
     * Called after the HdfsBolt rotates a file.
     */
    @Override
    public void reset() {

    }

    @Override
    public FileRotationPolicy copy() {
        return new CountStrRotationPolicy();
    }


}

FileNameFormat


import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;

import java.util.Map;

/**
 * 決定重新寫入文件時候的名字
 * 這裡會返回是第幾次轉換寫入文件,將這個第幾次做為文件名
 */
public class TimesFileNameFormat implements FileNameFormat {
    //預設路徑
    private String path = "/storm";
    //預設尾碼
    private String extension = ".txt";
    private Long times = new Long(0);

    public TimesFileNameFormat withPath(String path){
        this.path = path;
        return this;
    }

    @Override
    public void prepare(Map conf, TopologyContext topologyContext) {
    }


    @Override
    public String getName(long rotation, long timeStamp) {
        times ++ ;
        //返迴文件名,文件名為更換寫入文件次數
        return times.toString() + this.extension;
    }

    public String getPath(){
        return this.path;
    }
}

RotationAction


import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
/**
    當轉換寫入文件時候調用的 hook ,這裡僅寫入日誌。
 */
public class NewFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);



    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        LOG.info("Hdfs change the written file!!");

        return;
    }
}

OK,這樣就大功告成了。通過上面的代碼,每接收到 10 個 Tuple 後就會轉換寫入文件,新文件的名字就是第幾次轉換。

完整代碼包括一個隨機生成字元串的 Spout ,可以到我的 github 上查看。

StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo


推薦閱讀:
Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka
Spark SQL,如何將 DataFrame 轉為 json 格式


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • c# 添加引用時報錯:“未能正確載入“ReferenceManagerPackage”包”的解決方法 在添加應用的時候,右鍵點擊“引用”,選擇“添加引用”後,會提示“**未能正確載入ReferenceManagerPackage包**”之類的信息。具體的解決方法如下: (1)點擊開始按鈕,在搜索框中 ...
  • 配置bind 1、確定已經安裝bind軟體,需要安裝3 個bind、bind-chroot、bind-util [root@localhost wj]# yum install –y bind bind-chroot bind-util 2、修改配置文件“/etc/named.conf” [root ...
  • 簡介 mktemp命令用於創建一個臨時的文件或者目錄。 語法格式 示例 不帶選項和參數的mktemp用於創建臨時文件,帶-d選項用於創建臨時目錄。 創建完成後,會輸出臨時文件的絕對路徑。 當僅運行mktemp命令的時候,其等同於 這裡的XXX就表示隨機數,至少需要3個X。 -p DIR, --tmp ...
  • php7連接mysql測試代碼 ...
  • 簡介 從命令的名字上來看,會讓人誤以為這是一個和安裝相關的命令。 其實不然,install命令用於複製文件(cp)或創建空目錄(mkdir)並設置相關的屬性(chown、chmod)。 這裡的屬性包含了ownership、許可權以及時間戳(保留時間戳,而不是修改)。 語法格式 單源複製。一般省略掉-T ...
  • 在上章學習33.Linux-實現U盤自動掛載(詳解)後,只是講解了普通U盤掛載,並沒有涉及到多分區U盤,接下來本章來繼續學習 1.多分區U盤和普通U盤區別 1)U盤插上只會創建一個/dev/sda文件,這種一般表示該U盤沒有分區,這個sda文件便代表該U盤總大小,我們只需要掛載/dev/sda即可 ...
  • ubuntu16.4系統查看自啟服務: 需要自行安裝一個sysv-rc-conf的工具來查看: 查看自啟命令: Gentos6.8系統查看自啟服務:(註意這個只是查看用RPM包安裝的預設的獨立服務) 查看基於Xinetd的服務: 需要安裝Xinetd: 查詢系統中開啟的服務: ...
  • SQL Server系統表sysobjects 介紹 sysobjects 表結構: 列名 數據類型 描述 name sysname 對象名,常用列 id int 對象標識號 xtype char(2) 對象類型。常用列。xtype可以是下列對象類型中的一種: C = CHECK 約束 D = 預設 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...