DataX二次開發——HdfsReader和HdfsWriter插件增加parquet文件讀寫

来源:https://www.cnblogs.com/wxm2270/archive/2023/02/13/17115373.html
-Advertisement-
Play Games

一、研發背景 DataX官方開源的版本支持HDFS文件的讀寫,但是截止目前,並沒有支持Parquet文件的讀寫,得益於DataX出色的數據同步性能,去年公司的項目大部分採用了DataX作為數據同步工具,但是從CDH集群同步Parquet或者將其他數據源的數據以Parquet格式寫入HDFS,這兩個常 ...


一、研發背景

    DataX官方開源的版本支持HDFS文件的讀寫,但是截止目前,並沒有支持Parquet文件的讀寫,得益於DataX出色的數據同步性能,去年公司的項目大部分採用了DataX作為數據同步工具,但是從CDH集群同步Parquet或者將其他數據源的數據以Parquet格式寫入HDFS,這兩個常用場景沒有進行支持。因此只能自己動手,補充HdfsReader和HdfsWriter插件,以支持Parquet文件的讀寫。

二、HdfsReader插件

    本插件比較簡單,一共五個類,具體類名及對應修改項如下:

  • DFSUtil:增加是否Parquet文件類型判斷方法、增加Parquet文件讀取轉換方法。
  • HdfsConstant:增加Parquet文件類的枚舉項。
  • HdfsReader:增加判斷是否配置為Parquet文件類型的判斷條件分支。
  • HdfsReaderErrorCode:無需更改。
  • Type:無需更改。

    按需修改其中四個類即可,具體代碼如下:

    DFSUtil

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.ColumnEntry;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.reader.StorageReaderErrorCode;
import com.alibaba.datax.storage.reader.StorageReaderUtil;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.alibaba.datax.common.base.Key.COLUMN;
import static com.alibaba.datax.common.base.Key.NULL_FORMAT;


public class DFSUtil
{
    private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);

    // the offset of julian, 2440588 is 1970/1/1
    private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
    private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
    private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

    private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
    private final org.apache.hadoop.conf.Configuration hadoopConf;
    private final boolean haveKerberos;
    private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>();
    private String specifiedFileType = null;
    private String kerberosKeytabFilePath;
    private String kerberosPrincipal;

    public DFSUtil(Configuration taskConfig)
    {
        hadoopConf = new org.apache.hadoop.conf.Configuration();
        //io.file.buffer.size 性能參數
        //http://blog.csdn.net/yangjl38/article/details/7583374
        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
        if (null != hadoopSiteParams) {
            Set<String> paramKeys = hadoopSiteParams.getKeys();
            for (String each : paramKeys) {
                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
            }
        }
        hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));

        //是否有Kerberos認證
        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
        if (haveKerberos) {
            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
            this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
        }
        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);

        LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));
    }

    private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
    {
        if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {
                String message = String.format("kerberos認證失敗,請確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確",
                        kerberosKeytabFilePath, kerberosPrincipal);
                throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
        }
    }

    /**
     * 獲取指定路徑列表下符合條件的所有文件的絕對路徑
     *
     * @param srcPaths 路徑列表
     * @param specifiedFileType 指定文件類型
     * @return set of string
     */
    public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType)
    {

        this.specifiedFileType = specifiedFileType;

        if (!srcPaths.isEmpty()) {
            for (String eachPath : srcPaths) {
                LOG.info("get HDFS all files in path = [{}]", eachPath);
                getHDFSAllFiles(eachPath);
            }
        }
        return sourceHDFSAllFilesList;
    }

    private void addSourceFileIfNotEmpty(FileStatus f)
    {
        if (f.isFile()) {
            String filePath = f.getPath().toString();
            if (f.getLen() > 0) {
                addSourceFileByType(filePath);
            }
            else {
                LOG.warn("文件[{}]長度為0,將會跳過不作處理!", filePath);
            }
        }
    }

    public void getHDFSAllFiles(String hdfsPath)
    {

        try {
            FileSystem hdfs = FileSystem.get(hadoopConf);
            //判斷hdfsPath是否包含正則符號
            if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
                Path path = new Path(hdfsPath);
                FileStatus[] stats = hdfs.globStatus(path);
                for (FileStatus f : stats) {
                    if (f.isFile()) {
                        addSourceFileIfNotEmpty(f);
                    }
                    else if (f.isDirectory()) {
                        getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
                    }
                }
            }
            else {
                getHDFSAllFilesNORegex(hdfsPath, hdfs);
            }
        }
        catch (IOException e) {
            String message = String.format("無法讀取路徑[%s]下的所有文件,請確認您的配置項fs.defaultFS, path的值是否正確," +
                    "是否有讀寫許可權,網路是否已斷開!", hdfsPath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);
        }
    }

    private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
            throws IOException
    {

        // 獲取要讀取的文件的根目錄
        Path listFiles = new Path(path);

        // If the network disconnected, this method will retry 45 times
        // each time the retry interval for 20 seconds
        // 獲取要讀取的文件的根目錄的所有二級子文件目錄
        FileStatus[] stats = hdfs.listStatus(listFiles);

        for (FileStatus f : stats) {
            // 判斷是不是目錄,如果是目錄,遞歸調用
            if (f.isDirectory()) {
                LOG.info("[{}] 是目錄, 遞歸獲取該目錄下的文件", f.getPath());
                getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
            }
            else if (f.isFile()) {
                addSourceFileIfNotEmpty(f);
            }
            else {
                String message = String.format("該路徑[%s]文件類型既不是目錄也不是文件,插件自動忽略。", f.getPath());
                LOG.info(message);
            }
        }
    }

    // 根據用戶指定的文件類型,將指定的文件類型的路徑加入sourceHDFSAllFilesList
    private void addSourceFileByType(String filePath)
    {
        // 檢查file的類型和用戶配置的fileType類型是否一致
        boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);

        if (isMatchedFileType) {
            String msg = String.format("[%s]是[%s]類型的文件, 將該文件加入source files列表", filePath, this.specifiedFileType);
            LOG.info(msg);
            sourceHDFSAllFilesList.add(filePath);
        }
        else {
            String message = String.format("文件[%s]的類型與用戶配置的fileType類型不一致," +
                            "請確認您配置的目錄下麵所有文件的類型均為[%s]"
                    , filePath, this.specifiedFileType);
            LOG.error(message);
            throw DataXException.asDataXException(
                    HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
        }
    }

    public InputStream getInputStream(String filepath)
    {
        InputStream inputStream;
        Path path = new Path(filepath);
        try {
            FileSystem fs = FileSystem.get(hadoopConf);
            //If the network disconnected, this method will retry 45 times
            //each time the retry interval for 20 seconds
            inputStream = fs.open(path);
            return inputStream;
        }
        catch (IOException e) {
            String message = String.format("讀取文件 : [%s] 時出錯,請確認文件:[%s]存在且配置的用戶有許可權讀取", filepath, filepath);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
        }
    }

    public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);

        Path seqFilePath = new Path(sourceSequenceFilePath);
        try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
                SequenceFile.Reader.file(seqFilePath))) {
            //獲取SequenceFile.Reader實例
            //獲取key 與 value
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
            Text value = new Text();
            while (reader.next(key, value)) {
                if (StringUtils.isNotBlank(value.toString())) {
                    StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
                }
            }
        }
        catch (Exception e) {
            String message = String.format("SequenceFile.Reader讀取文件[%s]時出錯", sourceSequenceFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);
        }
    }

    public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
        List<ColumnEntry> column = StorageReaderUtil
                .getListColumnEntry(readerSliceConfig, COLUMN);
        // warn: no default value '\N'
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

        Path rcFilePath = new Path(sourceRcFilePath);
        RCFileRecordReader recordReader = null;
        try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
            long fileLen = fs.getFileStatus(rcFilePath).getLen();
            FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
            recordReader = new RCFileRecordReader(hadoopConf, split);
            LongWritable key = new LongWritable();
            BytesRefArrayWritable value = new BytesRefArrayWritable();
            Text txt = new Text();
            while (recordReader.next(key, value)) {
                String[] sourceLine = new String[value.size()];
                txt.clear();
                for (int i = 0; i < value.size(); i++) {
                    BytesRefWritable v = value.get(i);
                    txt.set(v.getData(), v.getStart(), v.getLength());
                    sourceLine[i] = txt.toString();
                }
                StorageReaderUtil.transportOneRecord(recordSender,
                        column, sourceLine, nullFormat, taskPluginCollector);
            }
        }
        catch (IOException e) {
            String message = String.format("讀取文件[%s]時出錯", sourceRcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);
        }
        finally {
            try {
                if (recordReader != null) {
                    recordReader.close();
                    LOG.info("Finally, Close RCFileRecordReader.");
                }
            }
            catch (IOException e) {
                LOG.warn(String.format("finally: 關閉RCFileRecordReader失敗, %s", e.getMessage()));
            }
        }
    }

    public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
        List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

        try {
            Path orcFilePath = new Path(sourceOrcFilePath);
            Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
            TypeDescription schema = reader.getSchema();
            assert column != null;
            if (column.isEmpty()) {
                for (int i = 0; i < schema.getChildren().size(); i++) {
                    ColumnEntry columnEntry = new ColumnEntry();
                    columnEntry.setIndex(i);
                    columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
                    column.add(columnEntry);
                }
            }

            VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
            org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
            while (rowIterator.nextBatch(rowBatch)) {
                transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
            }
        }
        catch (Exception e) {
            String message = String.format("從orc-file文件路徑[%s]中讀取數據發生異常,請聯繫系統管理員。"
                    , sourceOrcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    }

    private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
        Record record;
        for (int row = 0; row < rowBatch.size; row++) {
            record = recordSender.createRecord();
            try {
                for (ColumnEntry column : columns) {

                    Column columnGenerated;
                    if (column.getValue() != null) {
                        if (!"null".equals(column.getValue())) {
                            columnGenerated = new StringColumn(column.getValue());
                        }
                        else {
                            columnGenerated = new StringColumn();
                        }
                        record.addColumn(columnGenerated);
                        continue;
                    }
                    int i = column.getIndex();
                    String columnType = column.getType().toUpperCase();
                    ColumnVector col = rowBatch.cols[i];
                    Type type = Type.valueOf(columnType);
                    if (col.isNull[row]) {
                        record.addColumn(new StringColumn(null));
                        continue;
                    }
                    switch (type) {
                        case INT:
                        case LONG:
                        case BOOLEAN:
                        case BIGINT:
                            columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);
                            break;
                        case DATE:
                            columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
                            break;
                        case DECIMAL:
                            columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
                            break;
                        case BINARY:
                            BytesColumnVector b = (BytesColumnVector) col;
                            byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
                            columnGenerated = new BytesColumn(val);
                            break;
                        case TIMESTAMP:
                            columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
                            break;
                        default:
                            // type is string or other
                            String v = ((BytesColumnVector) col).toString(row);
                            columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
                            break;
                    }
                    record.addColumn(columnGenerated);
                }
                recordSender.sendToWriter(record);
            }
            catch (Exception e) {
                if (e instanceof DataXException) {
                    throw (DataXException) e;
                }
                taskPluginCollector.collectDirtyRecord(record, e.getMessage());
            }
        }
    }

    public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);
        List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
        Path parquetFilePath = new Path(sourceParquetFilePath);

        hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
        JobConf conf = new JobConf(hadoopConf);

        GenericData decimalSupport = new GenericData();
        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
        try (ParquetReader<GenericData.Record> reader = AvroParquetReader
                .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
                .withDataModel(decimalSupport)
                .withConf(conf)
                .build()) {
            GenericData.Record gRecord = reader.read();
            Schema schema = gRecord.getSchema();

            if (null == column || column.isEmpty()) {
                column = new ArrayList<>(schema.getFields().size());
                String sType;
                // 用戶沒有填寫具體的欄位信息,需要從parquet文件構建
                for (int i = 0; i < schema.getFields().size(); i++) {
                    ColumnEntry columnEntry = new ColumnEntry();
                    columnEntry.setIndex(i);
                    Schema type;
                    if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
                        type = schema.getFields().get(i).schema().getTypes().get(1);
                    }
                    else {
                        type = schema.getFields().get(i).schema();
                    }
                    sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
                    if (sType.startsWith("timestamp")) {
                        columnEntry.setType("timestamp");
                    }
                    else {
                        columnEntry.setType(sType);
                    }
                    column.add(columnEntry);
                }
            }
            while (gRecord != null) {
                transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
                gRecord = reader.read();
            }
        }
        catch (IOException e) {
            String message = String.format("從parquet file文件路徑[%s]中讀取數據發生異常,請聯繫系統管理員。"
                    , sourceParquetFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    }

    /*
     * create a transport record for Parquet file
     *
     *
     */
    private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
        Record record = recordSender.createRecord();
        Column columnGenerated;
        int scale = 10;
        try {
            for (ColumnEntry columnEntry : columnConfigs) {
                String columnType = columnEntry.getType();
                Integer columnIndex = columnEntry.getIndex();
                String columnConst = columnEntry.getValue();
                String columnValue = null;
                if (null != columnIndex) {
                    if (null != gRecord.get(columnIndex)) {
                        columnValue = gRecord.get(columnIndex).toString();
                    }
                    else {
                        record.addColumn(new StringColumn(null));
                        continue;
                    }
                }
                else {
                    columnValue = columnConst;
                }
                if (columnType.startsWith("decimal(")) {
                    String ps = columnType.replace("decimal(", "").replace(")", "");
                    columnType = "decimal";
                    if (ps.contains(",")) {
                        scale = Integer.parseInt(ps.split(",")[1].trim());
                    }
                    else {
                        scale = 0;
                    }
                }
                Type type = Type.valueOf(columnType.toUpperCase());
                if (StringUtils.equals(columnValue, nullFormat)) {
                    columnValue = null;
                }
                try {
                    switch (type) {
                        case STRING:
                            columnGenerated = new StringColumn(columnValue);
                            break;
                        case INT:
                        case LONG:
                            columnGenerated = new LongColumn(columnValue);
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(columnValue);
                            break;
                        case DECIMAL:
                            if (null == columnValue) {
                                columnGenerated = new DoubleColumn((Double) null);
                            }
                            else {
                                columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
                            }
                            break;
                        case BOOLEAN:
                            columnGenerated = new BoolColumn(columnValue);
                            break;
                        case DATE:
                            if (columnValue == null) {
                                columnGenerated = new DateColumn((Date) null);
                            }
                            else {
                                String formatString = columnEntry.getFormat();
                                if (StringUtils.isNotBlank(formatString)) {
                                    // 用戶自己配置的格式轉換
                                    SimpleDateFormat format = new SimpleDateFormat(
                                            formatString);
                                    columnGenerated = new DateColumn(
                                            format.parse(columnValue));
                                }
                                else {
                                    // 框架嘗試轉換
                                    columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
                                }
                            }
                            break;
                        case TIMESTAMP:
                            if (null == columnValue) {
                                columnGenerated = new DateColumn();
                            }
                            else if (columnValue.startsWith("[")) {
                                // INT96 https://github.com/apache/parquet-mr/pull/901
                                GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
                                Date date = new Date(getTimestampMills(fixed.bytes()));
                                columnGenerated = new DateColumn(date);
                            }
                            else {
                                columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
                            }
                            break;
                        case BINARY:
                            columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
                            break
              
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 工作邏輯是用戶啟動主程式,主程式啟動更新程式,更新程式立刻檢查是否有已經下載好的更新包,如果有則立刻關閉主程式進行更新,如果沒有則訪問伺服器查詢更新包,併在後臺靜默下載,下載完成後等下一次主程式啟動時更新 由於只是簡單的更新程式,所以沒有用資料庫,客戶端版本號以一個json文件保存,服務端則直接以壓 ...
  • 1前言 爬蟲一般都是用Python來寫,生態豐富,動態語言開發速度快,調試也很方便 但是 我要說但是,動態語言也有其局限性,筆者作為老爬蟲帶師,幾乎各種語言都搞過,現在這個任務並不複雜,用我最喜歡的C#做小菜一碟~ 2開始 之前做 OneCat 項目的時候,最開始的數據採集模塊,就是用 C# 做的, ...
  • 修改Windows遠程桌面3389埠 Windows版本:windows10、Windows server 2016、2019(其他版本沒有測過,應該也適用) 1、Windows桌面上,點擊“開始-運行”或鍵盤按下“WIN+R”打開運行視窗,輸入“regedit”,點擊確定打開註冊表編輯器。 2、 ...
  • 自己編譯的內核進行修改後為後續方便查詢是那個版本的系統。 所以每次更改內核後都需要修改一下版本信息, 又因為內核一般是不變的為了區分所以增加到擴展版本上。 操作環境: 硬體是全志 V3S Linux內核是3.4 修改的方法: 方法一: 一個在menuconfig中進行增加 打開menuconfig ...
  • 1、大多數情況下很正常,偶爾很慢,則有如下原因 (1)、資料庫在刷新臟頁,例如 redo log 寫滿了需要同步到磁碟。 (2)、執行的時候,遇到鎖,如表鎖、行鎖。 (3)、sql寫的爛 2、這條 SQL 語句一直執行的很慢,則有如下原因 (1)、沒有用上索引或索引失效:例如該欄位沒有索引;或則由於 ...
  • 摘要:提供一種執行高效的TereData的marco遷移方案。 本文分享自華為雲社區《GaussDB(DWS)遷移 - teredata相容 -- macro相容 # 【玩轉PB級數倉GaussDB(DWS)】》,作者: 譡里個檔 。 Teradata的巨集是一組可以接受參數的SQL語句,通過調用巨集名 ...
  • 總結了一下在以往工作中,對於`Hive SQL`調優的一些實際應用,是日常積累的一些優化技巧,如有出入,歡迎在評論區留言探討~ ...
  • 記錄一些工作中有意思的統計指標,做過一些簡化方便大家閱讀,記錄如有錯誤,歡迎在評論區提問討論~ 問題類型 連續問題 兩種思路 第一種:日期減去一列數字得出日期相同,主要是通過row_number視窗函數 第二種:後一個日期減去前一個日期差值相等,用的較少,可以用lag/lead視窗函數解決 分組問題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...