摘要:目前Hudi只支持FlinkSQL進行數據讀寫,但是在實際項目開發中一些客戶存在使用Flink DataStream API讀寫Hudi的訴求。 本文分享自華為雲社區《FusionInsight MRS Flink DataStream API讀寫Hudi實踐》,作者: yangxiao_mr ...
摘要:目前Hudi只支持FlinkSQL進行數據讀寫,但是在實際項目開發中一些客戶存在使用Flink DataStream API讀寫Hudi的訴求。
本文分享自華為雲社區《FusionInsight MRS Flink DataStream API讀寫Hudi實踐》,作者: yangxiao_mrs 。
目前Hudi只支持FlinkSQL進行數據讀寫,但是在實際項目開發中一些客戶存在使用Flink DataStream API讀寫Hudi的訴求。
該實踐包含三部分內容:
1)HoodiePipeline.java ,該類將Hudi內核讀寫介面進行封裝,提供Hudi DataStream API。
2)WriteIntoHudi.java ,該類使用 DataStream API將數據寫入Hudi。
3)ReadFromHudi.java ,該類使用 DataStream API讀取Hudi數據。
1.HoodiePipeline.java 將Hudi內核讀寫介面進行封裝,提供Hudi DataStream API。關鍵實現邏輯:
第一步:將原來Hudi流表的列名、主鍵、分區鍵set後,通過StringBuilder拼接成create table SQL。
第二步:將該hudi流表註冊到catalog中。
第三步:將DynamicTable轉換為DataStreamProvider後,進行數據produce或者consume。
import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTableFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * A tool class to construct hoodie flink pipeline. * * <p>How to use ?</p> * Method {@link #builder(String)} returns a pipeline builder. The builder * can then define the hudi table columns, primary keys and partitions. * * <p>An example:</p> * <pre> * HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable"); * DataStreamSink<?> sinkStream = builder * .column("f0 int") * .column("f1 varchar(10)") * .column("f2 varchar(20)") * .pk("f0,f1") * .partition("f2") * .sink(input, false); * </pre> */ public class HoodiePipeline { /** * Returns the builder for hoodie pipeline construction. */ public static Builder builder(String tableName) { return new Builder(tableName); } /** * Builder for hudi source/sink pipeline construction. */ public static class Builder { private final String tableName; private final List<String> columns; private final Map<String, String> options; private String pk; private List<String> partitions; private Builder(String tableName) { this.tableName = tableName; this.columns = new ArrayList<>(); this.options = new HashMap<>(); this.partitions = new ArrayList<>(); } /** * Add a table column definition. * * @param column the column format should be in the form like 'f0 int' */ public Builder column(String column) { this.columns.add(column); return this; } /** * Add primary keys. */ public Builder pk(String... pks) { this.pk = String.join(",", pks); return this; } /** * Add partition fields. */ public Builder partition(String... partitions) { this.partitions = new ArrayList<>(Arrays.asList(partitions)); return this; } /** * Add a config option. */ public Builder option(ConfigOption<?> option, Object val) { this.options.put(option.key(), val.toString()); return this; } public Builder option(String key, Object val) { this.options.put(key, val.toString()); return this; } public Builder options(Map<String, String> options) { this.options.putAll(options); return this; } public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) { TableDescriptor tableDescriptor = getTableDescriptor(); return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getCatalogTable(), bounded); } public TableDescriptor getTableDescriptor() { EnvironmentSettings environmentSettings = EnvironmentSettings .newInstance() .build(); TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings); String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions); tableEnv.executeSql(sql); String currentCatalog = tableEnv.getCurrentCatalog(); CatalogTable catalogTable = null; String defaultDatabase = null; try { Catalog catalog = tableEnv.getCatalog(currentCatalog).get(); defaultDatabase = catalog.getDefaultDatabase(); catalogTable = (CatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName)); } catch (TableNotExistException e) { throw new HoodieException("Create table " + this.tableName + " exception", e); } ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName); return new TableDescriptor(tableId, catalogTable); } public DataStream<RowData> source(StreamExecutionEnvironment execEnv) { TableDescriptor tableDescriptor = getTableDescriptor(); return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getCatalogTable()); } } private static String getCreateHoodieTableDDL( String tableName, List<String> fields, Map<String, String> options, String pkField, List<String> partitionField) { StringBuilder builder = new StringBuilder(); builder.append("create table ") .append(tableName) .append("(\n"); for (String field : fields) { builder.append(" ") .append(field) .append(",\n"); } builder.append(" PRIMARY KEY(") .append(pkField) .append(") NOT ENFORCED\n") .append(")\n"); if (!partitionField.isEmpty()) { String partitons = partitionField .stream() .map(partitionName -> "`" + partitionName + "`") .collect(Collectors.joining(",")); builder.append("PARTITIONED BY (") .append(partitons) .append(")\n"); } builder.append("with ('connector' = 'hudi'"); options.forEach((k, v) -> builder .append(",\n") .append(" '") .append(k) .append("' = '") .append(v) .append("'")); builder.append("\n)"); System.out.println(builder.toString()); return builder.toString(); } /** * Returns the data stream sink with given catalog table. * * @param input The input datastream * @param tablePath The table path to the hoodie table in the catalog * @param catalogTable The hoodie catalog table * @param isBounded A flag indicating whether the input data stream is bounded */ private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, CatalogTable catalogTable, boolean isBounded) { DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context) .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded))) .consumeDataStream(input); } /** * Returns the data stream source with given catalog table. * * @param execEnv The execution environment * @param tablePath The table path to the hoodie table in the catalog * @param catalogTable The hoodie catalog table */ private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, CatalogTable catalogTable) { DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory .createDynamicTableSource(context)) .getScanRuntimeProvider(new ScanRuntimeProviderContext()); return dataStreamScanProvider.produceDataStream(execEnv); } /*** * A POJO that contains tableId and resolvedCatalogTable. */ public static class TableDescriptor { private ObjectIdentifier tableId; private CatalogTable catalogTable; public TableDescriptor(ObjectIdentifier tableId, CatalogTable catalogTable) { this.tableId = tableId; this.catalogTable = catalogTable; } public ObjectIdentifier getTableId() { return tableId; } public CatalogTable getCatalogTable() { return catalogTable; } } private static class DefaultDynamicTableContext implements DynamicTableFactory.Context { private final ObjectIdentifier objectIdentifier; private final CatalogTable catalogTable; private final ReadableConfig configuration; private final ClassLoader classLoader; private final boolean isTemporary; DefaultDynamicTableContext( ObjectIdentifier objectIdentifier, CatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) { this.objectIdentifier = objectIdentifier; this.catalogTable = catalogTable; this.configuration = configuration; this.classLoader = classLoader; this.isTemporary = isTemporary; } @Override public ObjectIdentifier getObjectIdentifier() { return objectIdentifier; } @Override public CatalogTable getCatalogTable() { return catalogTable; } @Override public ReadableConfig getConfiguration() { return configuration; } @Override public ClassLoader getClassLoader() { return classLoader; } @Override public boolean isTemporary() { return isTemporary; } } }
2.WriteIntoHudi.java 使用 DataStream API將數據寫入Hudi。關鍵實現邏輯:
第一步:Demo中的數據源來自datagen connector Table。
第二步:使用toAppendStream將Table轉化為Stream。
第三步:build hudi sink stream後寫入Hudi。
在項目實踐中也可以直接使用DataStream源寫入Hudi。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import java.util.HashMap; import java.util.Map; public class WriteIntoHudi { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.getCheckpointConfig().setCheckpointInterval(10000); tableEnv.executeSql("CREATE TABLE datagen (\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " p varchar(20)\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '5'\n" + ")"); Table table = tableEnv.sqlQuery("SELECT * FROM datagen"); DataStream<RowData> dataStream = tableEnv.toAppendStream(table, RowData.class); String targetTable = "hudiSinkTable"; String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable"; Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Api_Sink"); } }
3.ReadFromHudi.java 使用 DataStream API讀取Hudi數據。關鍵實現邏輯:
第一步:build hudi source stream讀取hudi數據。
第二步:使用fromDataStream將stream轉化為table。
第三步:將Hudi table的數據使用print connector列印輸出。
在項目實踐中也可以直接讀取Hudi數據後寫入sink DataStream。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import java.util.HashMap; import java.util.Map; public class ReadFromHudi { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String targetTable = "hudiSourceTable"; String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable"; Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read options.put("read.streaming.start-commit", "20210316134557"); // specifies the start commit instant time HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); DataStream<RowData> rowDataDataStream = builder.source(env); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv.fromDataStream(rowDataDataStream,"uuid, name, age, ts, p"); tableEnv.registerTable("hudiSourceTable",table); tableEnv.executeSql("CREATE TABLE print(" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " p varchar(20)\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"); tableEnv.executeSql("insert into print select * from hudiSourceTable"); env.execute("Api_Source"); } }
4.在項目實踐中如果有解析Kafka複雜Json的需求:
1)使用FlinkSQL: https://bbs.huaweicloud.com/forum/thread-153494-1-1.html
2)使用Flink DataStream MapFunction實現。