本文將介紹使用DataX讀出Cos的Orc文件往StarRocks裡面寫。 需求: 需要將騰訊雲cos上84TB的數據, 同步到StarRocks某個大表。正常每個分區數據量20~30億,600GB。 工具:DataX插件:hdfsreader、starrockswriter對象存儲COS:非融合 ...
本文將介紹使用DataX讀出Cos的Orc文件往StarRocks裡面寫。
需求: 需要將騰訊雲cos上84TB的數據, 同步到StarRocks某個大表。正常每個分區數據量20~30億,600GB。
工具:DataX
插件:hdfsreader、starrockswriter
對象存儲COS:非融合
- hdfsreader:https://cloud.tencent.com/document/product/436/43654
- starrockswriter:https://docs.mirrorship.cn/zh/docs/loading/DataX-starrocks-writer
DataX
這裡我使用的datax版本是 DataX (DATAX-OPENSOURCE-3.0)
[svccnetlhs@HOST datax]<231211 17:17:11>$ tree bin/ conf/ bin/ ├── datax.py ├── dxprof.py └── perftrace.py conf/ ├── core.json └── logback.xml 0 directories, 5 files [svccnetlhs@HOST datax]<231211 17:18:52>$ /bin/python3 python3 python3.6 python3.6m [svccnetlhs@HOST datax]<231211 17:18:52>$ /bin/python3 bin/datax.py DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Usage: datax.py [options] job-url-or-path Options: -h, --help show this help message and exit Product Env Options: Normal user use these options to set jvm parameters, job runtime mode etc. Make sure these options can be used in Product Env. -j <jvm parameters>, --jvm=<jvm parameters> Set jvm parameters if necessary. --jobid=<job unique id> Set job unique id when running by Distribute/Local Mode. -m <job runtime mode>, --mode=<job runtime mode> Set job runtime mode such as: standalone, local, distribute. Default mode is standalone. -p <parameter used in job config>, --params=<parameter used in job config> Set job parameter, eg: the source tableName you want to set it by command, then you can use like this: -p"-DtableName=your-table-name", if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".Note: you should config in you job tableName with ${tableName}. -r <parameter used in view job config[reader] template>, --reader=<parameter used in view job config[reader] template> View job config[reader] template, eg: mysqlreader,streamreader -w <parameter used in view job config[writer] template>, --writer=<parameter used in view job config[writer] template> View job config[writer] template, eg: mysqlwriter,streamwriter Develop/Debug Options: Developer use these options to trace more details of DataX. -d, --debug Set to remote debug mode. --loglevel=<log level> Set log level such as: debug, info, all etc. [svccnetlhs@HOST datax]<231211 17:19:06>$
DataX (HdfsReader) 插件
[svccnetlhs@HOST datax]<231211 17:23:29>$ ls bin conf job lib log log_perf plugin script tmp [svccnetlhs@HOST datax]<231211 17:23:29>$ [svccnetlhs@HOST datax]<231211 17:23:30>$ cd plugin/ [svccnetlhs@HOST plugin]<231211 17:23:32>$ ls reader writer [svccnetlhs@HOST plugin]<231211 17:23:32>$ cd reader/ [svccnetlhs@HOST reader]<231211 17:23:36>$ ls cassandrareader datahubreader ftpreader hbase094xreader hbase11xsqlreader hdfsreader loghubreader mysqlreader odpsreader oraclereader otsreader postgresqlreader sqlserverreader streamreader tsdbreader clickhousereader drdsreader gdbreader hbase11xreader hbase20xsqlreader kingbaseesreader mongodbreader oceanbasev10reader opentsdbreader ossreader otsstreamreader rdbmsreader starrocksreader tdenginereader txtfilereader [svccnetlhs@HOST reader]<231211 17:23:37>$ cd hdfsreader/ [svccnetlhs@HOST hdfsreader]<231211 17:23:39>$ ls hdfsreader-0.0.1-SNAPSHOT.jar libs plugin_job_template.json plugin.json [svccnetlhs@HOST hdfsreader]<231211 17:23:40>$ [svccnetlhs@HOST hdfsreader]<231211 17:23:42>$ pwd /home/svccnetlhs/chengken/starrocks/datax/plugin/reader/hdfsreader [svccnetlhs@HOST hdfsreader]<231211 17:23:43>$ [svccnetlhs@HOST hdfsreader]<231211 17:23:44>$ cd libs/ [svccnetlhs@HOST libs]<231211 17:23:54>$ ls activation-1.1.jar commons-beanutils-1.9.2.jar curator-recipes-2.7.1.jar hadoop-mapreduce-client-core-2.7.1.jar httpclient-4.1.2.jar jetty-util-6.1.26.jar parquet-hadoop-bundle-1.6.0rc3.jar aircompressor-0.3.jar commons-beanutils-core-1.8.0.jar datanucleus-api-jdo-3.2.6.jar hadoop-yarn-api-2.7.1.jar httpcore-4.1.2.jar jline-2.12.jar pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar annotations-2.0.3.jar commons-cli-1.2.jar datanucleus-core-3.2.10.jar hadoop-yarn-common-2.7.1.jar jackson-core-asl-1.9.13.jar jpam-1.1.jar plugin-unstructured-storage-util-0.0.1-SNAPSHOT.jar ant-1.9.1.jar commons-codec-1.4.jar datanucleus-rdbms-3.2.9.jar hadoop-yarn-server-applicationhistoryservice-2.6.0.jar jackson-jaxrs-1.9.13.jar jsch-0.1.42.jar protobuf-java-2.5.0.jar ant-launcher-1.9.1.jar commons-collections-3.2.1.jar datax-common-0.0.1-SNAPSHOT.jar hadoop-yarn-server-common-2.6.0.jar jackson-mapper-asl-1.9.13.jar jsp-api-2.1.jar servlet-api-2.5.jar antlr-2.7.7.jar commons-compiler-2.7.6.jar derby-10.11.1.1.jar hadoop-yarn-server-resourcemanager-2.6.0.jar jackson-xc-1.9.13.jar jsr305-3.0.0.jar slf4j-api-1.7.10.jar antlr-runtime-3.4.jar commons-compress-1.4.1.jar eigenbase-properties-1.1.4.jar hadoop-yarn-server-web-proxy-2.6.0.jar janino-2.7.6.jar jta-1.1.jar slf4j-log4j12-1.7.10.jar aopalliance-1.0.jar commons-configuration-1.6.jar fastjson2-2.0.23.jar hamcrest-core-1.3.jar javacsv-2.0.jar leveldbjni-all-1.8.jar snappy-java-1.0.4.1.jar apache-curator-2.6.0.pom commons-daemon-1.0.13.jar geronimo-annotation_1.0_spec-1.1.1.jar hive-ant-1.1.1.jar javax.inject-1.jar libfb303-0.9.2.jar ST4-4.0.4.jar apacheds-i18n-2.0.0-M15.jar commons-dbcp-1.4.jar geronimo-jaspic_1.0_spec-1.0.jar hive-cli-1.1.1.jar java-xmlbuilder-0.4.jar libthrift-0.9.2.jar stax-api-1.0.1.jar apacheds-kerberos-codec-2.0.0-M15.jar commons-digester-1.8.jar geronimo-jta_1.1_spec-1.1.1.jar hive-common-1.1.1.jar jaxb-api-2.2.2.jar log4j-1.2.17.jar stax-api-1.0-2.jar apache-log4j-extras-1.2.17.jar commons-httpclient-3.1.jar groovy-all-2.1.6.jar hive-exec-1.1.1.jar jaxb-impl-2.2.3-1.jar log4j-api-2.17.1.jar stringtemplate-3.2.1.jar api-asn1-api-1.0.0-M20.jar commons-io-2.4.jar gson-2.2.4.jar hive-hcatalog-core-1.1.1.jar jdo-api-3.0.1.jar log4j-core-2.17.1.jar velocity-1.5.jar api-util-1.0.0-M20.jar commons-lang-2.6.jar guava-11.0.2.jar hive-metastore-1.1.1.jar jersey-client-1.9.jar logback-classic-1.0.13.jar xercesImpl-2.9.1.jar asm-3.1.jar commons-lang3-3.3.2.jar guice-3.0.jar hive-serde-1.1.1.jar jersey-core-1.9.jar logback-core-1.0.13.jar xml-apis-1.3.04.jar asm-commons-3.1.jar commons-logging-1.1.3.jar guice-servlet-3.0.jar hive-service-1.1.1.jar jersey-guice-1.9.jar lzo-core-1.0.5.jar xmlenc-0.52.jar asm-tree-3.1.jar commons-math3-3.1.1.jar hadoop-aliyun-2.7.2.jar hive-shims-0.20S-1.1.1.jar jersey-json-1.9.jar mail-1.4.1.jar xz-1.0.jar avro-1.7.4.jar commons-net-3.1.jar hadoop-annotations-2.7.1.jar hive-shims-0.23-1.1.1.jar jersey-server-1.9.jar netty-3.6.2.Final.jar zookeeper-3.4.6.jar bonecp-0.8.0.RELEASE.jar commons-pool-1.5.4.jar hadoop-auth-2.7.1.jar hive-shims-1.1.1.jar jets3t-0.9.0.jar netty-all-4.0.23.Final.jar calcite-avatica-1.0.0-incubating.jar cos_api-bundle-5.6.137.2.jar hadoop-common-2.7.1.jar hive-shims-common-1.1.1.jar jettison-1.1.jar opencsv-2.3.jar calcite-core-1.0.0-incubating.jar curator-client-2.7.1.jar hadoop-cos-3.1.0-8.3.2.jar hive-shims-scheduler-1.1.1.jar jetty-6.1.26.jar oro-2.0.8.jar calcite-linq4j-1.0.0-incubating.jar curator-framework-2.6.0.jar hadoop-hdfs-2.7.1.jar htrace-core-3.1.0-incubating.jar jetty-all-7.6.0.v20120127.jar paranamer-2.3.jar [svccnetlhs@HOST libs]<231211 17:23:55>$
DataX (StarRocksWriter) 插件
[svccnetlhs@HOST datax]<231211 17:25:07>$ ls bin conf job lib log log_perf plugin script tmp [svccnetlhs@HOST datax]<231211 17:25:08>$ cd plugin/ [svccnetlhs@HOST plugin]<231211 17:25:11>$ ls reader writer [svccnetlhs@HOST plugin]<231211 17:25:11>$ cd writer/ [svccnetlhs@HOST writer]<231211 17:25:13>$ ls adbpgwriter clickhousewriter doriswriter ftpwriter hbase11xsqlwriter hdfswriter kuduwriter mysqlwriter ocswriter oscarwriter postgresqlwriter sqlserverwriter tdenginewriter adswriter databendwriter drdswriter gdbwriter hbase11xwriter hologresjdbcwriter loghubwriter neo4jwriter odpswriter osswriter rdbmswriter starrockswriter tsdbwriter cassandrawriter datahubwriter elasticsearchwriter hbase094xwriter hbase20xsqlwriter kingbaseeswriter mongodbwriter oceanbasev10writer oraclewriter otswriter selectdbwriter streamwriter txtfilewriter [svccnetlhs@HOST writer]<231211 17:25:13>$ cd starrockswriter/ [svccnetlhs@HOST starrockswriter]<231211 17:25:15>$ ls libs plugin_job_template.json plugin.json starrockswriter-1.1.0.jar [svccnetlhs@HOST starrockswriter]<231211 17:25:16>$ ls libs/ commons-codec-1.9.jar commons-io-2.4.jar commons-logging-1.1.1.jar datax-common-0.0.1-SNAPSHOT.jar fastjson2-2.0.23.jar hamcrest-core-1.3.jar httpcore-4.4.6.jar logback-core-1.0.13.jar plugin-rdbms-util-0.0.1-SNAPSHOT.jar commons-collections-3.0.jar commons-lang3-3.3.2.jar commons-math3-3.1.1.jar druid-1.0.15.jar guava-r05.jar httpclient-4.5.3.jar logback-classic-1.0.13.jar mysql-connector-java-5.1.46.jar slf4j-api-1.7.10.jar [svccnetlhs@HOST starrockswriter]<231211 17:25:21>$
註: 兩個datax插件在文件開頭可以進行下載。
DataX JSON
眾所周知,DataX的是基於數據抽取、數據轉換和數據載入三個步驟來實現數據流的搬遷。
Datax設計理念:
Datax框架設計:
Datax工作流程:
連接JSON:
模板1:
{ “content”: [ { “reader”: { “name”: “hdfsreader”, “parameter”: { “column”: [ { /************************************/ “name”: “ts”, /************************************/ “type”: “string”, /************************************/ “value”: “2023-11-14” /************************************/ }, /************************************/ { /************************************/ “index”: 0, /************************************/ “name”: “local_id”, /************************************/ “type”: “string” /************************************/ }, /************************************/ { /****1.由於cos文件中沒有ts這個欄位***/ “index”: 1, /****這裡我則使用value指定一個固定值*/ “name”: “encrypted_imei”, /****value=2023-11-14代表當前path****/ “type”: “string” /****的分區數據, ********************/ }, /****此值在腳本中屬於動態傳參********/ { /************************************/ “index”: 2, /****2.這裡其他的欄位使用了index*****/ “name”: “encrypted_idfa”, /****下標的形式取到每個欄位的值******/ “type”: “string” /************************************/ }, /************************************/ { /************************************/ “index”: 3, /************************************/ “name”: “encrypted_mac”, /************************************/ “type”: “string” /************************************/ }, /************************************/ { /************************************/ “index”: 4, /************************************/ “name”: “encrypted_android_id”, /************************************/ “type”: “string” /************************************/ } /************************************/ ], “defaultFS”: “cosn: //桶名/”, “encoding”: “UTF-8”, “fieldDelimiter”: ",", “fileType”: “orc”, “hadoopConfig”: { “fs.cosn.impl”: “org.apache.hadoop.fs.CosFileSystem”, “fs.cosn.tmp.dir”: “本地臨時路徑(隨便)”, “fs.cosn.userinfo.region”: “ap-guangzhou”, “fs.cosn.userinfo.secretId”: "", “fs.cosn.userinfo.secretKey”: "" }, “path”: "/sam/sam_dwd_user_action_cos_d/20231114/part-00011*" } }, “writer”: { “name”: “starrockswriter”, “parameter”: { “column”: [ “ts”, /******************************************/ “local_id”, /******************************************/ “encrypted_imei”, /****StarRocks需要接收的欄位名*************/ “encrypted_idfa”, /******************************************/ “encrypted_mac”, /******************************************/ “encrypted_android_id” /******************************************/ ], “database”: “StarRocks庫名”, “jdbcUrl”: “jdbc: mysql: //StarRocksFE_IP:9030/”, “loadProps”: { “max_filter_ratio”: 1 }, “loadUrl”: [ “StarRocksFE_IP:8030”, “StarRocksFE_IP:8030”, “StarRocksFE_IP:8030” ], “password”: “StarRocks密碼”, “postSql”: [ ], “preSql”: [ ], “table”: “StarRocks表名”, “username”: “StarRocks用戶” } } } ], “setting”: { “speed”: { “byte”: -1, /********channel調整為3,不限速**********/ “channel”: 3 /*********************************************/ } } }
模板2:
{ "job": { "setting": { "speed": { "channel":3 }, "errorLimit": {} }, "content": [{ "reader": { "name": "hdfsreader", "parameter": { "path": "/sam/sam_dwd_user_action_cos_d/20231114/part-*", "defaultFS": "cosn://*********/", "column": [ {"name":"ts","type":"string","value":"2023-11-14"}, {"name":"import_ds_","type":"string","index":0}, {"name":"unique_action_id","type":"string","index":1}, {"name":"action_time","type":"string","index":2}, {"name":"report_time","type":"string","index":3}, {"name":"action_type","type":"string","index":4}, {"name":"ka_id","type":"string","index":5}, {"name":"action_session_id","type":"string","index":6}, {"name":"uuid","type":"string","index":7}, {"name":"wx_app_id","type":"string","index":8}, {"name":"wx_open_id","type":"string","index":9}, {"name":"wx_union_id","type":"string","index":10}, {"name":"external_user_id","type":"string","index":11}, {"name":"merber_id","type":"string","index":12}, {"name":"local_id","type":"string","