Flink 生成ParquetFile

来源:https://www.cnblogs.com/dk168/archive/2023/04/08/17298265.html
-Advertisement-
Play Games

前言 一、人物簡介 第一位閃亮登場,有請今後會一直教我們C語言的老師 —— 自在。 第二位上場的是和我們一起學習的小白程式猿 —— 逍遙。 二、構成和表達方式 位運算符是一組用於在二進位數之間進行操作的運算符 | 運算符 | 名稱 | 示例 | | : : | : : | : : | | & | 位 ...


前言

這周主要是學習使用Flink, 其中有一部分學習的內容就是生成parquet。 Flink自身提供的文檔寫了個大概,但是真要自己動手去生成pqrquet文件,發現還是有些小坑,本文就是記錄這些坑。

開始

官方文檔總是最好的開始的地方, 下麵是官方文檔上面的內容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
image
從官方文檔上面看,似乎很簡單, 使用FileSink, 然後設置下格式使用AvroParquetWriters就可以了。
但是按照這個設置後,連FileSink這個類都找不到。
FilkSink需要這個dependency,

org.apache.flink
flink-connector-files
${flink.version}

AvroParquetWriters需要的是這個dependency

org.apache.flink
flink-parquet
${flink.version}
provided

使用AVRO

官方文檔中使用了AvroParquetWriters, 那我們就先定義一個AVRO的schema文件MarketPrice.avsc,然後生成對應的類,

{
  "namespace": "com.ken.parquet",
  "type": "record",
  "name": "MarketPrice",
  "fields": [
    {"name":"performance_id", "type":"string"},
    {"name":"price_as_of_date", "type":"int", "logicalType": "date"},
    {"name":"open_price", "type": ["null", "double"], "default": null},
    {"name":"high_price", "type": ["null", "double"], "default": null},
    {"name":"low_price", "type": ["null", "double"], "default": null},
    {"name":"close_price", "type": ["null", "double"], "default": null}
  ]
}

然後加上Maven插件, 通過這個文件來生成Java類

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

添加好後,我們使用maven, compile的時候會生成對應的Java類。

編寫代碼

我們這裡不從外部讀取了,直接用env.fromCollection, 然後輸出到本地文件系統中


@Component
public class ParquetRunner implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<MarketPrice> marketPriceList = new ArrayList<>();
        MarketPrice marketPrice = new MarketPrice();
        marketPrice.setPerformanceId("123456789");
        marketPrice.setPriceAsOfDate(100);
        marketPrice.setOpenPrice(100d);
        marketPrice.setHighPrice(120d);
        marketPrice.setLowPrice(99d);
        marketPrice.setClosePrice(101.1d);
        marketPriceList.add(marketPrice);

        DataStream<MarketPrice>  marketPriceDataStream = env.fromCollection(marketPriceList);

        String localPath = "C:\\temp\\flink\\";

        File outputParquetFile = new File(localPath);

        String localURI = outputParquetFile.toURI().toString();
        Path outputPath = new Path(localURI);

        final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

        marketPriceDataStream.sinkTo(sink);

        marketPriceDataStream.print();

        env.execute();

    }
}

代碼很簡單,就是初始化DataStream, 然後Sink到本地。
運行程式報錯
“Caused by: java.lang.RuntimeException: Could not load the AvroTypeInfo class. You may be missing the 'flink-avro' dependency”
添加dependency

org.apache.flink
flink-avro
${flink.version}
provided

繼續運行,繼續報錯
“Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter”
查找了一番,添加這個dependency

org.apache.parquet
parquet-avro
1.12.3

繼續運行, 繼續報錯
“Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration”
看起來還需要hadoop的東西,這裡可以添加

org.apache.hadoop
hadoop-core

正好我們後面需要生成到S3,我找到了這個

org.apache.flink
flink-s3-fs-hadoop
${flink.version}

這樣可以不用上面hadoop-core了,
繼續運行,繼續報錯
“Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat”
加上這個dependency

org.apache.hadoop
hadoop-mapreduce-client-core
3.3.5

運行,成功,生成了parquet file, 如下圖
image

如果要生成到AWS的S3上面去,只需要把Path換下, 很簡單。 當然你需要有AWS的許可權,我這裡直接通過IDEA啟動Environment variables裡面加上AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN。

        String s3Path = "s3a://yourbucket/yourkey/";
        Path outputPath = new Path(s3Path);
		final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

總結

這些dependency的依賴,你要是缺少了,運行起來就會缺東少西,然後花時間去找,還蠻廢時間的。官方文檔往往又沒有那麼細,所以算是一些小小的坑,好在都解決了,順利的用Flink生成了Parquet file, 比較完成的POM文件列在這裡

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>${flink.version}</version>
        </dependency>

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 函數式語言特性:-迭代器和閉包 本章內容 閉包(closures) 迭代器(iterators) 優化改善 12 章的實例項目 討論閉包和迭代器的運行時性能 一、閉包(1)- 使用閉包創建抽象行為 什麼是閉包(closure) 閉包:可以捕獲其所在環境的匿名函數。 閉包: 是匿名函數 保存為變數、作 ...
  • 承接上文 承接上一篇文章【演算法數據結構專題】「延時隊列演算法」史上手把手教你針對層級時間輪(TimingWheel)實現延時隊列的開發實戰落地(上)】我們基本上對層級時間輪演算法的基本原理有了一定的認識,本章節就從落地的角度進行分析和介紹如何通過Java進行實現一個屬於我們自己的時間輪服務組件,最後,在 ...
  • 實現一個簡單的UDP通信程式,僅作為筆記使用 網路編程中有三要素:IP、埠號和通信協議,分別用來確定對方在互聯網上的地址、指定接受數據的軟體和確定數據在網路中傳輸的規則。 IP地址 IP地址分為IPv4地址和IPv6地址,這裡不做討論。 IPv4地址中分為公網地址(萬維網使用)和私有地址(區域網使 ...
  • 第二章 線程管控 std::thread 簡介 構造和析構函數 /// 預設構造 /// 創建一個線程,什麼也不做 thread() noexcept; /// 帶參構造 /// 創建一個線程,以 A 為參數執行 F 函數 template <class Fn, class... Args> exp ...
  • 作者:袁首京 原創文章,轉載時請保留此聲明,並給出原文連接。 元編程並不象它聽起來那麼時髦和新奇。常用的 decorator 就可以認為是一種元編程。簡單來說,元編程就是編寫操作代碼的代碼。 有點繞,是吧?彆著急,咱們一點一點來討論。 註意:本文中的代碼適用於 Python 3.3 及以上。 元類 ...
  • 哈嘍大家好,我是鹹魚 在《Flask Web 開髮指南 pt.1》中,鹹魚跟大家介紹了 Flask 的由來——誕生於一個愚人節玩笑,簡單介紹了一些關於 Flask 的概念,並且編寫了一個簡單的 Flask 程式 在編寫 Flask 程式的時候,你需要註意你的程式文件不要命名為 flask.py,建議 ...
  • Java之SPI機制詳解 1: SPI機制簡介 SPI 全稱是 Service Provider Interface,是一種 JDK 內置的動態載入實現擴展點的機制,通過 SPI 技術我們可以動態獲取介面的實現類,不用自己來創建。這個不是什麼特別的技術,只是 一種設計理念。 2: SPI原理 Jav ...
  • 一、為什麼要確定付費客戶特征? 先講個案例,以 Shopify 網站為例進行分析。該網站提供了許多功能,圍繞著潛在客戶在全生命周期中所需的業務需求,包括從創建業務開始、賺取收益等整個閉環鏈上所需的任何工具,如: 開始做生意:Business name generator 線上工具、功能變數名稱選擇頁面、Bu ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...