一. 概述 上一篇我們介紹瞭如何將數據從 mysql 拋到 kafka,這次我們就專註於利用 storm 將數據寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 數據充當數據源,下章再進行整合。這裡預設你是擁有 ...
一. 概述
上一篇我們介紹瞭如何將數據從 mysql 拋到 kafka,這次我們就專註於利用 storm 將數據寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 數據充當數據源,下章再進行整合。這裡預設你是擁有一定的 storm 知識的基礎,起碼知道 Spout 和 bolt 是什麼。
寫入 hdfs 可以有以下的定製策略:
- 自定義寫入文件的名字
- 定義寫入內容格式
- 滿足給定條件後更改寫入的文件
- 更改寫入文件時觸發的 Action
本篇會先說明如何用 storm 寫入 HDFS,寫入過程一些 API 的描述,以及最後給定一個例子:
storm 每接收到 10 個 Tuple 後就會改變 hdfs 寫入文件,新文件的名字就是第幾次改變。
ps:storm 版本:1.1.1 。Hadoop 版本:2.7.4 。
接下來我們首先看看 Storm 如何寫入 HDFS 。
二. Storm 寫入 HDFS
Storm 官方有提供了相應的 API 讓我們可以使用。可以通過創建 HdfsBolt 以及定義相應的規則,即可寫入 HDFS 。
首先通過 maven 配置依賴以及插件。
<properties>
<storm.version>1.1.1</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<!--hadoop模塊-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.1.1</version>
<!--<scope>test</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.7</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
這裡要提一下,如果要打包部署到集群上的話,打包的插件需要使用 maven-shade-plugin 這個插件,然後使用 maven Lifecycle 中的 package 打包。而不是用 Maven-assembly-plugin 插件進行打包。
因為使用 Maven-assembly-plugin 的時候,會將所有依賴的包unpack,然後在pack,這樣就會出現,同樣的文件被覆蓋的情況。發佈到集群上的時候就會報 No FileSystem for scheme: hdfs 的錯 。
然後是使用 HdfsBolt 寫入 Hdfs。這裡來看看官方文檔中的例子吧。
// 使用 "|" 來替代 ",",來進行字元分割
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 每輸入 1k 後將內容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// 當文件大小達到 5MB ,轉換寫入文件,即寫入到一個新的文件中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
//當轉換寫入文件時,生成新文件的名字並使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
//生成該 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
到這裡就結束了。可以將 HdfsBolt 當作一個 Storm 中特殊一些的 bolt 即可。這個 bolt 的作用即使根據接收信息寫入 Hdfs。
而在新建 HdfsBolt 中,Storm 為我們提供了相當強的靈活性,我們可以定義一些策略,比如當達成某個條件的時候轉換寫入文件,新寫入文件的名字,寫入時候的分隔符等等。
如果選擇使用的話,Storm 有提供部分介面供我們使用,但如果我們覺得不夠豐富也可以自定義相應的類。下麵我們看看如何控制這些策略吧。
RecordFormat
這是一個介面,允許你自由定義接收到內容的格式。
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
Storm 提供了 DelimitedRecordFormat ,使用方法在上面已經有了。這個類預設的分割符是逗號",",而你可以通過 withFieldDelimiter 方法改變分隔符。
如果你的初始分隔符不是逗號的話,那麼也可以重寫寫一個類實現 RecordFormat 介面即可。
FileNameFormat
同樣是一個介面。
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
Storm 所提供的預設的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 。預設人使用的轉換文件名有點長,格式是這樣的:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例如:
MyBolt-5-7-1390579837830.txt
預設情況下,首碼是空的,擴展標識是".txt"。
SyncPolicy
同步策略允許你將 buffered data 緩衝到 Hdfs 文件中(從而client可以讀取數據),通過實現org.apache.storm.hdfs.sync.SyncPolicy 介面:
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
FileRotationPolicy
這個介面允許你控制什麼情況下轉換寫入文件。
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
Storm 有提供三個實現該介面的類:
最簡單的就是不進行轉換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什麼也不幹。
通過文件大小觸發轉換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
通過時間條件來觸發轉換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。
如果有更加複雜的需求也可以自己定義。
RotationAction
這個主要是提供一個或多個 hook ,可加可不加。主要是在觸發寫入文件轉換的時候會啟動。
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
三.實現一個例子
瞭解了上面的情況後,我們會實現一個例子,根據寫入記錄的多少來控制寫入轉換(改變寫入的文件),並且轉換後文件的名字表示當前是第幾次轉換。
首先來看看 HdfsBolt 的內容:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
/** rotate file with Date,every month create a new file
* format:yyyymm.txt
*/
FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
RotationAction action = new NewFileAction();
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://127.0.0.1:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(action);
然後分別來看各個策略的類。
FileRotationPolicy
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 計數以改變Hdfs寫入文件的位置,當寫入10次的時候,則更改寫入文件,更改名字取決於 “TimesFileNameFormat”
* 這個類是線程安全
*/
public class CountStrRotationPolicy implements FileRotationPolicy {
private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
private String date = null;
private int count = 0;
public CountStrRotationPolicy(){
this.date = df.format(new Date());
// this.date = df.format(new Date());
}
/**
* Called for every tuple the HdfsBolt executes.
*
* @param tuple The tuple executed.
* @param offset current offset of file being written
* @return true if a file rotation should be performed
*/
@Override
public boolean mark(Tuple tuple, long offset) {
count ++;
if(count == 10) {
System.out.print("num :" +count + " ");
count = 0;
return true;
}
else {
return false;
}
}
/**
* Called after the HdfsBolt rotates a file.
*/
@Override
public void reset() {
}
@Override
public FileRotationPolicy copy() {
return new CountStrRotationPolicy();
}
}
FileNameFormat
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
/**
* 決定重新寫入文件時候的名字
* 這裡會返回是第幾次轉換寫入文件,將這個第幾次做為文件名
*/
public class TimesFileNameFormat implements FileNameFormat {
//預設路徑
private String path = "/storm";
//預設尾碼
private String extension = ".txt";
private Long times = new Long(0);
public TimesFileNameFormat withPath(String path){
this.path = path;
return this;
}
@Override
public void prepare(Map conf, TopologyContext topologyContext) {
}
@Override
public String getName(long rotation, long timeStamp) {
times ++ ;
//返迴文件名,文件名為更換寫入文件次數
return times.toString() + this.extension;
}
public String getPath(){
return this.path;
}
}
RotationAction
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
/**
當轉換寫入文件時候調用的 hook ,這裡僅寫入日誌。
*/
public class NewFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
LOG.info("Hdfs change the written file!!");
return;
}
}
OK,這樣就大功告成了。通過上面的代碼,每接收到 10 個 Tuple 後就會轉換寫入文件,新文件的名字就是第幾次轉換。
完整代碼包括一個隨機生成字元串的 Spout ,可以到我的 github 上查看。
StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
推薦閱讀:
Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka
Spark SQL,如何將 DataFrame 轉為 json 格式