一、研發背景 DataX官方開源的版本支持HDFS文件的讀寫,但是截止目前,並沒有支持Parquet文件的讀寫,得益於DataX出色的數據同步性能,去年公司的項目大部分採用了DataX作為數據同步工具,但是從CDH集群同步Parquet或者將其他數據源的數據以Parquet格式寫入HDFS,這兩個常 ...
一、研發背景
DataX官方開源的版本支持HDFS文件的讀寫,但是截止目前,並沒有支持Parquet文件的讀寫,得益於DataX出色的數據同步性能,去年公司的項目大部分採用了DataX作為數據同步工具,但是從CDH集群同步Parquet或者將其他數據源的數據以Parquet格式寫入HDFS,這兩個常用場景沒有進行支持。因此只能自己動手,補充HdfsReader和HdfsWriter插件,以支持Parquet文件的讀寫。
二、HdfsReader插件
本插件比較簡單,一共五個類,具體類名及對應修改項如下:
- DFSUtil:增加是否Parquet文件類型判斷方法、增加Parquet文件讀取轉換方法。
- HdfsConstant:增加Parquet文件類的枚舉項。
- HdfsReader:增加判斷是否配置為Parquet文件類型的判斷條件分支。
- HdfsReaderErrorCode:無需更改。
- Type:無需更改。
按需修改其中四個類即可,具體代碼如下:
DFSUtil
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.element.BoolColumn; import com.alibaba.datax.common.element.BytesColumn; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.ColumnEntry; import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.DoubleColumn; import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.reader.StorageReaderErrorCode; import com.alibaba.datax.storage.reader.StorageReaderUtil; import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.orc.TypeDescription; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import static com.alibaba.datax.common.base.Key.COLUMN; import static com.alibaba.datax.common.base.Key.NULL_FORMAT; public class DFSUtil { private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class); // the offset of julian, 2440588 is 1970/1/1 private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; private final org.apache.hadoop.conf.Configuration hadoopConf; private final boolean haveKerberos; private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>(); private String specifiedFileType = null; private String kerberosKeytabFilePath; private String kerberosPrincipal; public DFSUtil(Configuration taskConfig) { hadoopConf = new org.apache.hadoop.conf.Configuration(); //io.file.buffer.size 性能參數 //http://blog.csdn.net/yangjl38/article/details/7583374 Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set<String> paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS)); //是否有Kerberos認證 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); } this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath); LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf)); } private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) { if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { String message = String.format("kerberos認證失敗,請確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確", kerberosKeytabFilePath, kerberosPrincipal); throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 獲取指定路徑列表下符合條件的所有文件的絕對路徑 * * @param srcPaths 路徑列表 * @param specifiedFileType 指定文件類型 * @return set of string */ public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType) { this.specifiedFileType = specifiedFileType; if (!srcPaths.isEmpty()) { for (String eachPath : srcPaths) { LOG.info("get HDFS all files in path = [{}]", eachPath); getHDFSAllFiles(eachPath); } } return sourceHDFSAllFilesList; } private void addSourceFileIfNotEmpty(FileStatus f) { if (f.isFile()) { String filePath = f.getPath().toString(); if (f.getLen() > 0) { addSourceFileByType(filePath); } else { LOG.warn("文件[{}]長度為0,將會跳過不作處理!", filePath); } } } public void getHDFSAllFiles(String hdfsPath) { try { FileSystem hdfs = FileSystem.get(hadoopConf); //判斷hdfsPath是否包含正則符號 if (hdfsPath.contains("*") || hdfsPath.contains("?")) { Path path = new Path(hdfsPath); FileStatus[] stats = hdfs.globStatus(path); for (FileStatus f : stats) { if (f.isFile()) { addSourceFileIfNotEmpty(f); } else if (f.isDirectory()) { getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } } } else { getHDFSAllFilesNORegex(hdfsPath, hdfs); } } catch (IOException e) { String message = String.format("無法讀取路徑[%s]下的所有文件,請確認您的配置項fs.defaultFS, path的值是否正確," + "是否有讀寫許可權,網路是否已斷開!", hdfsPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e); } } private void getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException { // 獲取要讀取的文件的根目錄 Path listFiles = new Path(path); // If the network disconnected, this method will retry 45 times // each time the retry interval for 20 seconds // 獲取要讀取的文件的根目錄的所有二級子文件目錄 FileStatus[] stats = hdfs.listStatus(listFiles); for (FileStatus f : stats) { // 判斷是不是目錄,如果是目錄,遞歸調用 if (f.isDirectory()) { LOG.info("[{}] 是目錄, 遞歸獲取該目錄下的文件", f.getPath()); getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } else if (f.isFile()) { addSourceFileIfNotEmpty(f); } else { String message = String.format("該路徑[%s]文件類型既不是目錄也不是文件,插件自動忽略。", f.getPath()); LOG.info(message); } } } // 根據用戶指定的文件類型,將指定的文件類型的路徑加入sourceHDFSAllFilesList private void addSourceFileByType(String filePath) { // 檢查file的類型和用戶配置的fileType類型是否一致 boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType); if (isMatchedFileType) { String msg = String.format("[%s]是[%s]類型的文件, 將該文件加入source files列表", filePath, this.specifiedFileType); LOG.info(msg); sourceHDFSAllFilesList.add(filePath); } else { String message = String.format("文件[%s]的類型與用戶配置的fileType類型不一致," + "請確認您配置的目錄下麵所有文件的類型均為[%s]" , filePath, this.specifiedFileType); LOG.error(message); throw DataXException.asDataXException( HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } } public InputStream getInputStream(String filepath) { InputStream inputStream; Path path = new Path(filepath); try { FileSystem fs = FileSystem.get(hadoopConf); //If the network disconnected, this method will retry 45 times //each time the retry interval for 20 seconds inputStream = fs.open(path); return inputStream; } catch (IOException e) { String message = String.format("讀取文件 : [%s] 時出錯,請確認文件:[%s]存在且配置的用戶有許可權讀取", filepath, filepath); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } } public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath); Path seqFilePath = new Path(sourceSequenceFilePath); try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf, SequenceFile.Reader.file(seqFilePath))) { //獲取SequenceFile.Reader實例 //獲取key 與 value Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf); Text value = new Text(); while (reader.next(key, value)) { if (StringUtils.isNotBlank(value.toString())) { StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString()); } } } catch (Exception e) { String message = String.format("SequenceFile.Reader讀取文件[%s]時出錯", sourceSequenceFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e); } } public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read rc-file [{}].", sourceRcFilePath); List<ColumnEntry> column = StorageReaderUtil .getListColumnEntry(readerSliceConfig, COLUMN); // warn: no default value '\N' String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path rcFilePath = new Path(sourceRcFilePath); RCFileRecordReader recordReader = null; try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) { long fileLen = fs.getFileStatus(rcFilePath).getLen(); FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null); recordReader = new RCFileRecordReader(hadoopConf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); Text txt = new Text(); while (recordReader.next(key, value)) { String[] sourceLine = new String[value.size()]; txt.clear(); for (int i = 0; i < value.size(); i++) { BytesRefWritable v = value.get(i); txt.set(v.getData(), v.getStart(), v.getLength()); sourceLine[i] = txt.toString(); } StorageReaderUtil.transportOneRecord(recordSender, column, sourceLine, nullFormat, taskPluginCollector); } } catch (IOException e) { String message = String.format("讀取文件[%s]時出錯", sourceRcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e); } finally { try { if (recordReader != null) { recordReader.close(); LOG.info("Finally, Close RCFileRecordReader."); } } catch (IOException e) { LOG.warn(String.format("finally: 關閉RCFileRecordReader失敗, %s", e.getMessage())); } } } public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read orc-file [{}].", sourceOrcFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); try { Path orcFilePath = new Path(sourceOrcFilePath); Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf)); TypeDescription schema = reader.getSchema(); assert column != null; if (column.isEmpty()) { for (int i = 0; i < schema.getChildren().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); columnEntry.setType(schema.getChildren().get(i).getCategory().getName()); column.add(columnEntry); } } VectorizedRowBatch rowBatch = schema.createRowBatch(1024); org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema)); while (rowIterator.nextBatch(rowBatch)) { transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat); } } catch (Exception e) { String message = String.format("從orc-file文件路徑[%s]中讀取數據發生異常,請聯繫系統管理員。" , sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record; for (int row = 0; row < rowBatch.size; row++) { record = recordSender.createRecord(); try { for (ColumnEntry column : columns) { Column columnGenerated; if (column.getValue() != null) { if (!"null".equals(column.getValue())) { columnGenerated = new StringColumn(column.getValue()); } else { columnGenerated = new StringColumn(); } record.addColumn(columnGenerated); continue; } int i = column.getIndex(); String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; Type type = Type.valueOf(columnType); if (col.isNull[row]) { record.addColumn(new StringColumn(null)); continue; } switch (type) { case INT: case LONG: case BOOLEAN: case BIGINT: columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]); break; case DATE: columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row])); break; case DOUBLE: columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]); break; case DECIMAL: columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue()); break; case BINARY: BytesColumnVector b = (BytesColumnVector) col; byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]); columnGenerated = new BytesColumn(val); break; case TIMESTAMP: columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row)); break; default: // type is string or other String v = ((BytesColumnVector) col).toString(row); columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v); break; } record.addColumn(columnGenerated); } recordSender.sendToWriter(record); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } } public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path parquetFilePath = new Path(sourceParquetFilePath); hadoopConf.set("parquet.avro.readInt96AsFixed", "true"); JobConf conf = new JobConf(hadoopConf); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetReader<GenericData.Record> reader = AvroParquetReader .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)) .withDataModel(decimalSupport) .withConf(conf) .build()) { GenericData.Record gRecord = reader.read(); Schema schema = gRecord.getSchema(); if (null == column || column.isEmpty()) { column = new ArrayList<>(schema.getFields().size()); String sType; // 用戶沒有填寫具體的欄位信息,需要從parquet文件構建 for (int i = 0; i < schema.getFields().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); Schema type; if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) { type = schema.getFields().get(i).schema().getTypes().get(1); } else { type = schema.getFields().get(i).schema(); } sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName(); if (sType.startsWith("timestamp")) { columnEntry.setType("timestamp"); } else { columnEntry.setType(sType); } column.add(columnEntry); } } while (gRecord != null) { transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat); gRecord = reader.read(); } } catch (IOException e) { String message = String.format("從parquet file文件路徑[%s]中讀取數據發生異常,請聯繫系統管理員。" , sourceParquetFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } /* * create a transport record for Parquet file * * */ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record = recordSender.createRecord(); Column columnGenerated; int scale = 10; try { for (ColumnEntry columnEntry : columnConfigs) { String columnType = columnEntry.getType(); Integer columnIndex = columnEntry.getIndex(); String columnConst = columnEntry.getValue(); String columnValue = null; if (null != columnIndex) { if (null != gRecord.get(columnIndex)) { columnValue = gRecord.get(columnIndex).toString(); } else { record.addColumn(new StringColumn(null)); continue; } } else { columnValue = columnConst; } if (columnType.startsWith("decimal(")) { String ps = columnType.replace("decimal(", "").replace(")", ""); columnType = "decimal"; if (ps.contains(",")) { scale = Integer.parseInt(ps.split(",")[1].trim()); } else { scale = 0; } } Type type = Type.valueOf(columnType.toUpperCase()); if (StringUtils.equals(columnValue, nullFormat)) { columnValue = null; } try { switch (type) { case STRING: columnGenerated = new StringColumn(columnValue); break; case INT: case LONG: columnGenerated = new LongColumn(columnValue); break; case DOUBLE: columnGenerated = new DoubleColumn(columnValue); break; case DECIMAL: if (null == columnValue) { columnGenerated = new DoubleColumn((Double) null); } else { columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP)); } break; case BOOLEAN: columnGenerated = new BoolColumn(columnValue); break; case DATE: if (columnValue == null) { columnGenerated = new DateColumn((Date) null); } else { String formatString = columnEntry.getFormat(); if (StringUtils.isNotBlank(formatString)) { // 用戶自己配置的格式轉換 SimpleDateFormat format = new SimpleDateFormat( formatString); columnGenerated = new DateColumn( format.parse(columnValue)); } else { // 框架嘗試轉換 columnGenerated = new DateColumn(new StringColumn(columnValue).asDate()); } } break; case TIMESTAMP: if (null == columnValue) { columnGenerated = new DateColumn(); } else if (columnValue.startsWith("[")) { // INT96 https://github.com/apache/parquet-mr/pull/901 GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex); Date date = new Date(getTimestampMills(fixed.bytes())); columnGenerated = new DateColumn(date); } else { columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); } break; case BINARY: columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array()); break