Spark是現在應用最廣泛的分散式計算框架,oozie支持在它的調度中執行spark。在我的日常工作中,一部分工作就是基於oozie維護好每天的spark離線任務,合理的設計工作流並分配適合的參數對於spark的穩定運行十分重要。 Spark Action 這個Action允許執行spark任務,需 ...
Spark是現在應用最廣泛的分散式計算框架,oozie支持在它的調度中執行spark。在我的日常工作中,一部分工作就是基於oozie維護好每天的spark離線任務,合理的設計工作流並分配適合的參數對於spark的穩定運行十分重要。
Spark Action
這個Action允許執行spark任務,需要用戶指定job-tracker以及name-node。先看看語法規則:
語法規則
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
...
<action name="[NODE-NAME]">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[SPARK SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<master>[SPARK MASTER URL]</master>
<mode>[SPARK MODE]</mode>
<name>[SPARK JOB NAME]</name>
<class>[SPARK MAIN CLASS]</class>
<jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
<spark-opts>[SPARK-OPTIONS]</spark-opts>
<arg>[ARG-VALUE]</arg>
...
<arg>[ARG-VALUE]</arg>
...
</spark>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
prepare元素
它裡面可以執行刪除文件或者創建目錄的操作,比如
<delete path="hdfs://xxxx/a"/>
<mkdir path="hdfs://xxxx"/>
一般來說,離線的spark任務最重都會生成一些數據,這些數據可能存儲到資料庫中,也可能直接存儲到hdfs,如果存儲到hdfs就涉及到清除目錄了。比如你可能在測試環境需要頻繁的重覆運行spark任務,那麼每次都需要清除目錄文件,創建新的目錄才行。
job-xml
spark 任務的參數也可以放在job-xml所在的xml中。
confugration
這裡面的配置的參數將會傳遞給spark任務。
master
spark運行的模式,表示spark連接的集群管理器。預設可以使spark的獨立集群(spark://host:port)或者是mesos(mesos://host:port)或者是yarn(yarn),以及本地模式local
mode
因為spark任務也可以看做主節點和工作節點模式,主節點就是驅動程式。這個驅動程式既可以運行在提交任務的機器,也可以放在集群中運行。
這個參數就是用來設置,驅動程式是以客戶端的形式運行在本地機器,還是以集群模式運行在集群中。
name
spark應用的名字
class
spark應用的主函數
jar
spark應用的jar包
spark-opts
提交給驅動程式的參數。比如--conf key=value
或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations
。
arg
這個參數是用來提交給spark應用的參數
例子
官網給出的例子:
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstsparkjob">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<master>local[*]</master>
<mode>client<mode>
<name>Spark Example</name>
<class>org.apache.spark.examples.mllib.JavaALS</class>
<jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
<spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
<arg>inputpath=hdfs://localhost/input/file.txt</arg>
<arg>value=2</arg>
</spark>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
我自己工作時的例子:
<action name="NODE1">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>NODE1</name>
<class>com.test.NODE1_Task</class>
<jar>/lib/dw.jar</jar>
<spark-opts>--executor-memory 1G --num-executors 6 --executor-cores 1 --conf spark.storage.memoryFraction=0.8</spark-opts>
<arg>參數1</arg>
<arg>參數2</arg>
<arg>參數3</arg>
</spark>
</action>
日誌
spark action日誌會重定向到oozie的mapr啟動程式的stdout/stderr中。
通過oozie的web控制條,可以看到spark的日誌。
spark on yarn
如果想要把spark運行在yarn上,需要按照下麵的步驟執行:
- 在spark action中載入spark-assembly包
- 指定master為yarn-client或者yarn-master
為了確保spark工作在spark歷史伺服器中可以查到,需要保證在--conf中或者oozie.service.SparkConfiturationService.spark.configrations
中設置下麵的三個參數:
- spark.yarn.historyServer.address=http://spark-host:18088
- spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
- spark.eventLog.enabled=true
spark action的schema
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
targetNamespace="uri:oozie:spark-action:0.1"> <xs:element name="spark" type="spark:ACTION"/>
<xs:complexType name="ACTION">
<xs:sequence>
<xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="CONFIGURATION">
<xs:sequence>
<xs:element name="property" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
<xs:complexType name="PREPARE">
<xs:sequence>
<xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="DELETE">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
<xs:complexType name="MKDIR">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
</xs:schema>