![file](https://img2023.cnblogs.com/other/2685289/202307/2685289-20230726144741004-1172150774.png) By AWS Team ## 前言 隨著企業規模的擴大,業務數據的激增,我們會使用 Hadoop/Sp ...
By AWS Team
前言
隨著企業規模的擴大,業務數據的激增,我們會使用 Hadoop/Spark 框架來處理大量數據的 ETL/聚合分析作業,⽽這些作業將需要由統一的作業調度平臺去定時調度。
在 Amazon EMR 中,可以使用 AWS 提供 Step Function,托管 AirFlow,以及 Apache Oozie 或 Azkaban 進行作業的調用。但隨著 Apache Dolphinscheduler 產品完善、社區日益火爆、且其本身具有簡單易用、高可靠、高擴展性、⽀持豐富的使用場景、提供多租戶模式等特性,越來越多的企業選擇使用該產品作為任務調度的服務。
DolphinScheduler 可以在 Amazon EMR 集群中進行安裝和部署,但是結合 Amazon EMR 本身的特點和使用最佳實踐,不建議客戶使用一個大而全,並且持久運行的 EMR 集群提供整個大數據的相關服務,而是基於不同的維度對集群進行拆分,比如按研發階段(開發、測試、生產)、工作負載(即席查詢、批處理)、對時間敏感性、作業時長要求、組織類型等,因此 DolphinScheduler 作為統一的調度平臺,不需要安裝在某一個固定 EMR 集群上,而是選擇獨立部署,將作業劃分到不同的 EMR 集群上,並以 DAG(Directed Acyclic Graph,DAG)流式方式組裝,實現統一的調度和管理。
此篇文章將介紹 DolphinScheduler 安裝部署,以及在 DolphinScheduler 中進行作業編排,以使用 python 腳本的方式執行 EMR 的任務調度,包括創建集群、集群狀態檢查、提交 EMR Step 作業、EMR Step 作業狀態檢查,所有作業完成後終止集群。
Amazon EMR
Amazon EMR 是一個托管的集群平臺,可簡化在 AWS 上運行大數據框架(如 Apache Hadoop 和 Apache Spark)的過程,以處理和分析海量數據。用戶可一鍵啟動包含了眾多 Hadoop 生態數據處理,分析相關服務的集群,⽽無需手動進行複雜的配置。
Apache DolphinScheduler
Apache DolphinScheduler 是一個分散式易擴展的可視化 DAG 工作流任務調度開源系統。適用於企業級場景,提供了⼀個可視化操作任務、工作流和全生命周期數據處理過程的解決方案。
特性
-
簡單易用
- 可視化 DAG:⽤戶友好的,通過拖拽定義工作流的,運行時控制工具模塊化
- 操作:模塊化,有助於輕鬆定製和維護
-
豐富的使用場景
- 支持多種任務類型:支持 Shell、MR、Spark、SQL 等 10 餘種任務類型,支持跨語言
- 易於擴展豐富的工作流操作:⼯作流程可以定時、暫停、恢復和停止,便於維護和控制全局和本地參數
-
High Reliability
高可靠性:去中心化設計,確保穩定性。原生 HA 任務隊列支持,提供過載容錯能力。DolphinScheduler 能提供高度穩健的環境。
- High Scalability
高擴展性:支持多租戶和線上資源管理。支持每天 10 萬個數據任務的穩定運行。
架構圖:
主要可實現:
- 以 DAG 圖的方式將 Task 按照任務的依賴關係關聯起來,可實時可視化監控任務的運行狀態
- 支持豐富的任務類型:Shell、MR、Spark、SQL(mysql、oceanbase、postgresql、hive、sparksql)、Python、Sub_Process、Procedure 等
- 支持工作流定時調度、依賴調度、手動調度、手動暫停/停止/恢復,同時支持失敗重試/告警、從指定節點恢復失敗、Kill 任務等操作
- 支持工作流優先順序、任務優先順序及任務的故障轉移及任務超時告警/失敗
- 支持工作流全局參數及節點自定義參數設置
- 支持資源文件的線上上傳/下載,管理等,支持線上文件創建、編輯
- 支持任務日誌線上查看及滾動、線上下載日誌等
- 實現集群 HA,通過 Zookeeper 實現 Master 集群和 Worker 集群去中心化
- 支持對 Master/Worker CPU load,memory,CPU 線上查看
- 支持工作流運行歷史樹形/甘特圖展示、支持任務狀態統計、流程狀態統計
- 支持補數
- 支持多租戶
安裝 DolphinScheduler
DolphinScheduler 支持多種部署方式
- 單機部署:Standalone 僅適用於 DolphinScheduler 的快速體驗
- 偽集群部署:偽集群部署目的是在單台機器部署 DolphinScheduler 服務,該模式下 master、worker、api server 都在同⼀台機器上
- 集群部署:集群部署目的是在多台機器部署 DolphinScheduler 服務,用於運行⼤量任務情況
如果你是新手,想要體驗 DolphinScheduler 的功能,推薦使用 Standalone 方式體驗;如果你想體驗更完整的功能,或者更大的任務量,推薦使用偽集群部署;如果你是在生產中使用,推薦使用集群部署或者 kubernetes。
本次實驗將介紹在 AWS 上以偽集群模式部署 DolphinScheduler。
- 啟動⼀台 EC2
在 AWS 公有子網中啟動一臺 EC2,選用 Amazon-linux2,m5.xlarge 安全組開啟 TCP 12345 埠。
- 安裝 JDK,配置 JAVA_HOME 環境
java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
- 安裝啟動 Zookeeper
bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/src/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
- 啟動 mysql,這裡選用 Aurora Serverless
- 安裝 AWS CLI2
aws --version
aws-cli/2.11.4 Python/3.11.2 Linux/5.10.167-147.601.amzn2.x86_64 exe/x86_64.amzn.2 prompt/off
- 更新 python 版本到 3.9
python --version
Python 3.9.1
- 下載 DolphinScheduler
cd /usr/local/src
wget https://dlcdn.apache.org/dolphinscheduler/3.1.4/apache-dolphinscheduler-3.1.4-bin.tar.gz
- 配置用戶免密及許可權
# 創建用戶需使用 root 登錄
useradd dolphinscheduler
# 添加密碼
echo "dolphinscheduler" | passwd --stdin dolphinscheduler
# 配置 sudo 免密
sed -i '$adolphinscheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL' /etc/sudoers
sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
# 修改目錄許可權,使得部署用戶對二進位包解壓後的 apache-dolphinscheduler-*-bin 目錄有操作許可權
cd /usr/local/src
chown -R dolphinscheduler:dolphinscheduler apache-dolphinscheduler-*-bin
- 配置機器 SSH 免密登錄
# 切換 dolphinscheduler 用戶
su dolphinscheduler
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
# 註意:配置完成後,可以通過運行命令 ssh localhost 判斷是否成功,如果不需要輸⼊密碼就能 ssh 登陸則證明成功
- 數據初始化
cd /usr/local/src
# 下載 mysql-connector
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.31.tar.gz
tar -zxvf mysql-connector-j-8.0.31.tar.gz
# 驅動拷貝
cp mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar ./apache-dolphinscheduler-3.1.4-bin/api-server/libs/
cp mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar ./apache-dolphinscheduler-3.1.4-bin/alert-server/libs/
cp mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar ./apache-dolphinscheduler-3.1.4-bin/master-server/libs/
cp mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar ./apache-dolphinscheduler-3.1.4-bin/worker-server/libs/
cp mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar ./apache-dolphinscheduler-3.1.4-bin/tools/libs/
# 安裝 mysql 客戶端
# 修改 {mysql-endpoint} 為你 mysql 連接地址
# 修改 {user} 和 {password} 為你 mysql ⽤戶名和密碼
mysql -h {mysql-endpoint} -u{user} -p{password}
mysql> CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
# 修改 {user} 和 {password} 為你希望的用戶名和密碼
mysql> CREATE USER '{user}'@'%' IDENTIFIED BY '{password}';
mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'%';
mysql> CREATE USER '{user}'@'localhost' IDENTIFIED BY '{password}';
mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'localhost';
mysql> FLUSH PRIVILEGES;
修改資料庫配置
vi bin/env/dolphinscheduler_env.sh
# Database related configuration, set database type, username and password # 修改 {mysql-endpoint} 為你 mysql 連接地址
# 修改 {user} 和 {password} 為你 mysql ⽤戶名和密碼,{rds-endpoint}為資料庫連接地址
export DATABASE=${DATABASE:-mysql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://{rds-endpoint}/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
export SPRING_DATASOURCE_USERNAME={user}
export SPRING_DATASOURCE_PASSWORD={password}
# 執行數據初始化
bash apache-dolphinscheduler/tools/bin/upgrade-schema.sh
- 修改 install_env.sh
cd /usr/local/src/apache-dolphinscheduler
vi bin/env/install_env.sh
# 替換 IP 為 DolphinScheduler 所部署 EC2 私有 IP 地址
ips=${ips:-"10.100.1.220"}
masters=${masters:-"10.100.1.220"}
workers=${workers:-"10.100.1.220:default"}
alertServer=${alertServer:-"10.100.1.220"}
apiServers=${apiServers:-"10.100.1.220"}
installPath=${installPath:-"~/dolphinscheduler"}
- 修改 DolphinScheduler_env.sh
cd /usr/local/src/
mv apache-dolphinscheduler-3.1.4-bin apache-dolphinscheduler
cd ./apache-dolphinscheduler
# 修改 DolphinScheduler 環境變數
vi bin/env/dolphinscheduler_env.sh
export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.362.b08-1.amzn2.0.1.x86_64}
export PYTHON_HOME=${PYTHON_HOME:-/bin/python}
- 啟動 DolphinScheduler
cd /usr/local/src/apache-dolphinscheduler
su dolphinscheduler
bash ./bin/install.sh
- 訪問 DolphinScheduler
URL 訪問使用 IP 為 DolphinScheduler 所部署 EC2 公有 IP 地址 http://ec2-endpoint:12345/dolphinscheduler/ui/login
初始用戶名/密碼 admin/dolphinscheduler123
05
配置 DolphinScheduler
- 建立租戶
2. 將用戶與綁定租戶
- AWS 創建 IAM 策略
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"ElasticMapReduceActions",
"Effect":"Allow",
"Action":[
"elasticmapreduce:RunJobFlow",
"elasticmapreduce:DescribeCluster",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:SetTerminationProtection"
],
"Resource":"*"
},
{
"Effect":"Allow",
"Action":[
"iam:GetRole",
"iam:PassRole"
],
"Resource":[
"arn:aws:iam::accountid:role/EMR_DefaultRole",
"arn:aws:iam::accountid:role:role/EMR_EC2_DefaultRole"
]
}
]
}
- 創建 IAM ⾓⾊
進入 AWS IAM,創建角色,並賦予上⼀步所創建的策略
- DolphinScheduler 部署 EC2 綁定角色
將 EC2 綁定上⼀步創建的角色,使 DolphinScheduler 所部署 EC2 具有調用 EMR 許可權。
- python 安裝 boto3,以及要用到其他的組件
sudu pip install boto3
sudu pip install redis
使用DolphinScheduler進行作業編排
以Python方式執行。
作業執行時序圖:
- 創建 EMR 集群創建任務
創建⼀個 EMR 集群,3 個 MASTER,3 個 CORE,指定子網與許可權,以及集群空閑十分鐘後自動終止。具體參數可見鏈接。
import boto3
from datetime import date
import redis
def run_job_flow():
response = client.run_job_flow(
Name='create-emrcluster-'+ d1,
LogUri='s3://s3bucket/elasticmapreduce/',
ReleaseLabel='emr-6.8.0',
Instances={
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
# 替換{Sunbet-id}為你需要部署的子網 id
'Ec2SubnetId': '{Sunbet-id}',
# 替換{Keypairs-name}為你 ec2 使用密鑰對名稱
'Ec2KeyName': '{Keypairs-name}',
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 3,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp3',
'SizeInGB': 500
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': True
},
},
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 3,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp3',
'SizeInGB': 500
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': True
},
}
],
},
Applications=[{'Name': 'Spark'},{'Name': 'Hive'},{'Name': 'Pig'},{'Name': 'Presto'}],
Configurations=[
{ 'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
},
{ 'Classification': 'hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
},
{ 'Classification': 'presto-connector-hive',
'Properties': {
'hive.metastore.glue.datacatalog.enabled': 'true'}
}
],
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
EbsRootVolumeSize=100,
# 集群空閑十分鐘自動終止
AutoTerminationPolicy={
'IdleTimeout': 600
}
)
return response
if __name__ == "__main__":
today = date.today()
d1 = today.strftime("%Y%m%d")
# {region}替換為你需要創建 EMR 的 Region
client = boto3.client('emr',region_name='{region}')
# 創建 EMR 集群
clusterCreate = run_job_flow()
job_id = clusterCreate['JobFlowId']
# 使用 redis 來保存信息,作為 DolphinScheduler job step 的參數傳遞,也可以使用 DolphinScheduler 所使用的 mysql 或者其他方式存儲
# 替換{redis-endpoint}為你 redis 連接地址
pool = redis.ConnectionPool(host='{redis-endpoint}', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)
r.set('cluster_id_'+d1, job_id)
- 創建 EMR 集群狀態檢查任務
檢查 EMR 集群是否創建完畢
import boto3
import redis
import time
from datetime import date
if __name__ == "__main__":
today = date.today()
d1 = today.strftime("%Y%m%d")
# {region}替換為你需要創建 EMR 的 Region
client = boto3.client('emr',region_name='{region}')
# 替換{redis-endpoint}為你 redis 連接地址
pool = redis.ConnectionPool(host='{redis-endpoint}', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 獲取創建的 EMR 集群 id
job_id = r.get('cluster_id_' + d1)
print(job_id)
while True:
result = client.describe_cluster(ClusterId=job_id)
emr_state = result['Cluster']['Status']['State']
print(emr_state)
if emr_state == 'WAITING':
# EMR 集群創建成功
break
elif emr_state == 'FAILED':
# 集群創建失敗
# do something...
break
else:
time.sleep(10)
- 使用創建好的 EMR 集群啟動 spark job
import time
import re
import boto3
from datetime import date
import redis
def generate_step(step_name, step_command):
cmds = re.split('\\s+', step_command)
print(cmds)
if not cmds:
raise ValueError
return {
'Name': step_name,
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': cmds
}
}
if __name__ == "__main__":
today = date.today()
d1 = today.strftime("%Y%m%d")
# {region}替換為你需要創建 EMR 的 Region
client = boto3.client('emr',region_name='{region}')
# 獲取 emr 集群 id
# 替換{redis-endpoint}為你 redis 連接地址
pool = redis.ConnectionPool(host='{redis-endpoint}', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)
job_id = r.get('cluster_id_' + d1)
# job 啟動命令
spark_submit_cmd = """spark-submit
s3://s3bucket/file/spark/spark-etl.py
s3://s3bucket/input/
s3://s3bucket/output/spark/"""+d1+'/'
steps = []
steps.append(generate_step("SparkExample_"+d1 , spark_submit_cmd),)
# 提交 EMR Step 作業
response = client.add_job_flow_steps(JobFlowId=job_id, Steps=steps)
step_id = response['StepIds'][0]
# 將作業 id 保存,以便於做任務檢查
r.set('SparkExample_'+d1, step_id)
- 創建 JOB 執⾏情況檢查
import boto3
import redis
import time
from datetime import date
if __name__ == "__main__":
today = date.today()
d1 = today.strftime("%Y%m%d")
# {region}替換為你需要創建 EMR 的 Region
client = boto3.client('emr',region_name='{region}')
# 替換{redis-endpoint}為你 redis 連接地址
pool = redis.ConnectionPool(host='{redis-endpoint}', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)
job_id = r.get('cluster_id_' + d1)
step_id = r.get('SparkExample_' + d1)
print(job_id)
print(step_id)
while True:
# 查詢作業執行結果
result = client describe_step(ClusterId=job_id,StepId=step_id)
emr_state = result['Step']['Status']['State']
print(emr_state)
if emr_state == 'COMPLETED':
# 作業執行完成
break
elif emr_state == 'FAILED'
# 作業執行失敗
# do somethine
# ......
break
else:
time.sleep(10)
- 設置執⾏順序
在 DolphinScheduler – 項目管理 – 工作流 – 工作流定義中創建工作流,並創建 python 任務,將以上 python 腳本作為任務串聯起來
- 保存並上線
保存任務並點擊上線
- 執行
可以點擊立即執行,或指定計劃任務按時執行。
在 EMR 中查看執行情況
EMR 創建情況——正在啟動
EMR Step執行情況——正在執行
- 檢查執行結果以及執行⽇志
在 DolphinScheduler – 項目管理 – 工作流 – 工作流實例中檢查執行狀態,以及執行日誌
在 EMR 中查看執⾏情況
EMR 創建情況——正在等待
Step 執行情況——完成
- 終止集群
對於臨時性執行作業或者每天定時執行的批處理作業,可以在作業結束後終⽌ EMR 集群以節省成本(EMR 使用最佳實踐)。終止 EMR 集群可以使用 EMR 本身功能在空閑後自動終止,或者手動調用中止。
自動終止 EMR 集群,在創建集群中進行配置
AutoTerminationPolicy={
'IdleTimeout': 600
}
此集群將在作業執行完空閑十分鐘後自動終止
手動終止 EMR 集群:
import boto3
from datetime import date
import redis
if __name__ == "__main__":
today = date.today()
d1 = today.strftime("%Y%m%d")
# 獲取集群 id
# {region}替換為你需要創建 EMR 的 Region
client = boto3.client('emr',region_name='{region}')
# 替換{redis-endpoint}為你 redis 連接地址
pool = redis.ConnectionPool(host='{redis-endpoint}', port=6379, decode_responses=True)
r = redis.Redis(connection_pool=pool)
job_id = r.get('cluster_id_' + d1)
# 關閉集群終止保護
client.set_termination_protection(JobFlowIds=[job_id],TerminationProtected=False)
# 終止集群
client.terminate_job_flows(JobFlowIds=[job_id])
將此腳本加⼊到 DolphinScheduler 作業流中,作業流在全部任務執行完成後執行該腳本以實現終止 EMR 集群。
總結
隨著企業大數據分析平臺的應⽤,越來越多數據處理流程/處理任務需要利用一個簡單易用的調度系統去理清其錯綜複雜的依賴關係,並且按執行計划進行編排調度,同時需要提供易使用易擴展的可視化 DAG 能力,而 Apache DolphinScheduler 正好滿足了以上需求。
本文介紹了在 AWS 上獨立部署 DolphinScheduler,並利用 EMR 的特性,結合最佳實踐,展示了從創建 EMR 集群到提交 ETL 作業,最後作業執行全部完成後將集群進行終止,形成⼀個完整的作業處理的流程。用戶可以參考該文檔快速的部署搭建自己的大數據調度體系。
作者
王驍,AWS 解決方案架構師,負責基於 AWS 雲計算方案架構的咨詢和設計,在國內推廣 AWS 雲平臺技術和各種解決方案,具有豐富的企業 IT 架構經驗,目前側重於於大數據領域的研究。
本文由 白鯨開源 提供發佈支持!