前言 一、人物簡介 第一位閃亮登場,有請今後會一直教我們C語言的老師 —— 自在。 第二位上場的是和我們一起學習的小白程式猿 —— 逍遙。 二、構成和表達方式 位運算符是一組用於在二進位數之間進行操作的運算符 | 運算符 | 名稱 | 示例 | | : : | : : | : : | | & | 位 ...
前言
這周主要是學習使用Flink, 其中有一部分學習的內容就是生成parquet。 Flink自身提供的文檔寫了個大概,但是真要自己動手去生成pqrquet文件,發現還是有些小坑,本文就是記錄這些坑。
開始
官方文檔總是最好的開始的地方, 下麵是官方文檔上面的內容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
從官方文檔上面看,似乎很簡單, 使用FileSink, 然後設置下格式使用AvroParquetWriters就可以了。
但是按照這個設置後,連FileSink這個類都找不到。
FilkSink需要這個dependency,
AvroParquetWriters需要的是這個dependency
使用AVRO
官方文檔中使用了AvroParquetWriters, 那我們就先定義一個AVRO的schema文件MarketPrice.avsc,然後生成對應的類,
{
"namespace": "com.ken.parquet",
"type": "record",
"name": "MarketPrice",
"fields": [
{"name":"performance_id", "type":"string"},
{"name":"price_as_of_date", "type":"int", "logicalType": "date"},
{"name":"open_price", "type": ["null", "double"], "default": null},
{"name":"high_price", "type": ["null", "double"], "default": null},
{"name":"low_price", "type": ["null", "double"], "default": null},
{"name":"close_price", "type": ["null", "double"], "default": null}
]
}
然後加上Maven插件, 通過這個文件來生成Java類
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
添加好後,我們使用maven, compile的時候會生成對應的Java類。
編寫代碼
我們這裡不從外部讀取了,直接用env.fromCollection, 然後輸出到本地文件系統中
@Component
public class ParquetRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<MarketPrice> marketPriceList = new ArrayList<>();
MarketPrice marketPrice = new MarketPrice();
marketPrice.setPerformanceId("123456789");
marketPrice.setPriceAsOfDate(100);
marketPrice.setOpenPrice(100d);
marketPrice.setHighPrice(120d);
marketPrice.setLowPrice(99d);
marketPrice.setClosePrice(101.1d);
marketPriceList.add(marketPrice);
DataStream<MarketPrice> marketPriceDataStream = env.fromCollection(marketPriceList);
String localPath = "C:\\temp\\flink\\";
File outputParquetFile = new File(localPath);
String localURI = outputParquetFile.toURI().toString();
Path outputPath = new Path(localURI);
final FileSink<MarketPrice> sink = FileSink
.forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
.build();
marketPriceDataStream.sinkTo(sink);
marketPriceDataStream.print();
env.execute();
}
}
代碼很簡單,就是初始化DataStream, 然後Sink到本地。
運行程式報錯
“Caused by: java.lang.RuntimeException: Could not load the AvroTypeInfo class. You may be missing the 'flink-avro' dependency”
添加dependency
繼續運行,繼續報錯
“Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter”
查找了一番,添加這個dependency
繼續運行, 繼續報錯
“Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration”
看起來還需要hadoop的東西,這裡可以添加
正好我們後面需要生成到S3,我找到了這個
這樣可以不用上面hadoop-core了,
繼續運行,繼續報錯
“Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat”
加上這個dependency
運行,成功,生成了parquet file, 如下圖
如果要生成到AWS的S3上面去,只需要把Path換下, 很簡單。 當然你需要有AWS的許可權,我這裡直接通過IDEA啟動Environment variables裡面加上AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN。
String s3Path = "s3a://yourbucket/yourkey/";
Path outputPath = new Path(s3Path);
final FileSink<MarketPrice> sink = FileSink
.forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
.build();
總結
這些dependency的依賴,你要是缺少了,運行起來就會缺東少西,然後花時間去找,還蠻廢時間的。官方文檔往往又沒有那麼細,所以算是一些小小的坑,好在都解決了,順利的用Flink生成了Parquet file, 比較完成的POM文件列在這裡
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>