YARN DistributedShell源碼分析與修改 轉載請註明出處: "http://www.cnblogs.com/BYRans/" <br/ "1 概述" "2 YARN DistributedShell不能滿足當前需求" "2.1 功能需求" "2.2 YARN Distr...
YARN DistributedShell源碼分析與修改
YARN版本:2.6.0
轉載請註明出處:http://www.cnblogs.com/BYRans/
1 概述
Hadoop YARN項目自帶一個非常簡單的應用程式編程實例--DistributedShell。DistributedShell是一個構建在YARN之上的non-MapReduce應用示例。它的主要功能是在Hadoop集群中的多個節點,並行執行用戶提供的shell命令或shell腳本(將用戶提交的一串shell命令或者一個shell腳本,由ApplicationMaster控制,分配到不同的container中執行)。
2 YARN DistributedShell不能滿足當前需求
2.1 功能需求
我所參與的項目通過融合Hive、MapReduce、Spark、Kafka等大數據開源組件,搭建了一個數據分析平臺。
平臺需要新增一個功能:
- 在集群中選取一個節點,執行用戶提交的jar包。
- 該功能需要與平臺已有的基於Hive、MR、Spark實現的業務以及YARN相融合。
- 簡而言之,經分析與調研,我們需要基於YARN的DistributedShell實現該功能。
該功能需要實現:
- 單機執行用戶自己提交的jar包
- 用戶提交的jar包會有其他jar包的依賴
- 用戶提交的jar包只能選取一個節點運行
- 用戶提交的jar包需要有緩存數據的目錄
2.2 YARN DistributedShell對需求的支持情況
YARN的DistributedShell功能為:
- 支持執行用戶提供的shell命令或腳本
- 執行節點數可以通過參數num_containers設置,預設值為1
- 不支持jar包的執行
- 更不支持依賴包的提交
- 不支持jar包緩存目錄的設置
2.3 需要對YARN DistributedShell進行的修改
- 增加支持執行jar包功能
- 增加支持緩存目錄設置功能
- 刪除執行節點數設置功能,不允許用戶設置執行節點數,將執行節點數保證值為1
3 YARN DistributedShell源碼獲取
YARN DistributedShell源碼可以在GitHub上apache/hadoop獲取,hadoop repository中DistributedShell的源代碼路徑為:
hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
這裡修改的是2.6.0版本源碼。
4 YARN DistributedShell源碼分析及修改
YARN DistributedShell包含4個java Class:
DistributedShell
├── Client.java
├── ApplicationMaster.java
├── DSConstants.java
├── Log4jPropertyHelper.java
- Client:客戶端提交application
- ApplicationMaster:註冊AM,申請分配container,啟動container
- DSConstants:Client類和ApplicationMaster類中的常量定義
- Log4jPropertyHelper:載入Log4j配置
4.1 Client類
4.1.1 Client源碼邏輯
Client類是DistributedShell應用提交到YARN的客戶端。Client將啟動application master,然後application master啟動多個containers用於運行shell命令或腳本。Client運行邏輯為:
- 使用ApplicationClientProtocol協議連接ResourceManager(也叫ApplicationsMaster或ASM),獲取一個新的ApplicationId。(ApplicationClientProtocol提供給Client一個獲取集群信息的方式)
- 在一個job提交過程中,Client首先創建一個ApplicationSubmissionContext。ApplicationSubmissionContext定義了application的詳細信息,例如:ApplicationId、application name、application分配的優先順序、application分配的隊列。另外,ApplicationSubmissionContext還定義了一個Container,該Container用於啟動ApplicationMaster。
- 在ContainerLaunchContext中需要初始化啟動ApplicationMaster的資源:
- 運行ApplicationMaster的container的資源
- jars(例:AppMaster.jar)、配置文件(例:log4j.properties)
- 運行環境(例:hadoop特定的類路徑、java classpath)
- 啟動ApplicationMaster的命令
- Client使用ApplicationSubmissionContext提交application到ResourceManager,並通過按周期向ResourceManager請求ApplicationReport,完成對applicatoin的監控。
- 如果application運行時間超過timeout的限制(預設為600000毫秒,可通過-timeout進行設置),client將發送KillApplicationRequest到ResourceManager,將application殺死。
具體代碼如下(基於YARN2.6.0):
- Cilent的入口main方法:
public static void main(String[] args) {
boolean result = false;
try {
DshellClient client = new DshellClient();
LOG.info("Initializing Client");
try {
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
} catch (IllegalArgumentException e) {
System.err.println(e.getLocalizedMessage());
client.printUsage();
System.exit(-1);
}
result = client.run();
} catch (Throwable t) {
LOG.fatal("Error running Client", t);
System.exit(1);
}
if (result) {
LOG.info("Application completed successfully");
System.exit(0);
}
LOG.error("Application failed to complete successfully");
System.exit(2);
}
main方法:
- 輸入參數為用戶CLI的執行命令,例如:
hadoop jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar -shell_command '/bin/date' -num_containers 10
,該命令提交的任務為:啟動10個container,每個都執行date
命令。 - main方法將運行init方法,如果init方法返回true則運行run方法。
- init方法解析用戶提交的命令,解析用戶命令中的參數值。
- run方法將完成Client源碼邏輯中描述的功能。
4.1.2 對Client源碼的修改
在原有YARN DistributedShell的基礎上做的修改如下:
- 在CLI為用戶增加了
container_files
和container_archives
兩個參數container_files
指定用戶要執行的jar包的依賴包,多個依賴包以逗號分隔container_archives
指定用戶執行的jar包的緩存目錄,多個目錄以逗號分隔
- 刪除
num_containers
參數- 不允許用戶設置container的個數,使用預設值1
對Client源碼修改如下:
- 變數
- 增加變數用於保存
container_files
和container_archives
兩個參數的值
- 增加變數用於保存
// 增加兩個變數,保存container_files、container_archives的參數值↓↓↓↓↓↓↓
private String[] containerJarPaths = new String[0];
private String[] containerArchivePaths = new String[0];
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
- Client構造方法
- 刪除num_containers參數的初試化,增加
container_files
和container_archives
兩個參數 - 修改構造方法的ApplicationMaster類
- 刪除num_containers參數的初試化,增加
// 刪除num_containers項,不允許用戶設置containers個數,containers個數預設為1 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
// 添加container_files、container_archives的描述↓↓↓↓↓↓↓↓↓↓↓↓↓↓
this.opts.addOption("container_files", true,"The files that containers will run . Separated by comma");
this.opts.addOption("container_archives", true,"The archives that containers will unzip. Separated by comma");
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
public DshellClient(Configuration conf) throws Exception {
// 修改構造方法的ApplicationMaster類↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
this("org.apache.hadoop.yarn.applications.distributedshell.DshellApplicationMaster",conf);
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
}
- init方法
- 增加
container_files
和container_archives
兩個參數的解析
- 增加
// 初始化選項container_files、container_archives↓↓↓↓↓↓↓
this.opts.addOption("container_files", true,"The files that containers will run . Separated by comma");
this.opts.addOption("container_archives", true,"The archives that containers will unzip. Separated by comma");
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
- run方法
- 上傳
container_files
和container_archives
兩個參數指定的依賴包和緩存目錄至HDFS
- 上傳
// 上傳container_files指定的jar包到HDFS ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
if (this.containerJarPaths.length != 0)
for (int i = 0; i < this.containerJarPaths.length; i++) {
String hdfsJarLocation = "";
String[] jarNameSplit = this.containerJarPaths[i].split("/");
String jarName = jarNameSplit[(jarNameSplit.length - 1)];
long hdfsJarLen = 0L;
long hdfsJarTimestamp = 0L;
if (!this.containerJarPaths[i].isEmpty()) {
Path jarSrc = new Path(this.containerJarPaths[i]);
String jarPathSuffix = this.appName + "/" + appId.toString() +
"/" + jarName;
Path jarDst = new Path(fs.getHomeDirectory(), jarPathSuffix);
fs.copyFromLocalFile(false, true, jarSrc, jarDst);
hdfsJarLocation = jarDst.toUri().toString();
FileStatus jarFileStatus = fs.getFileStatus(jarDst);
hdfsJarLen = jarFileStatus.getLen();
hdfsJarTimestamp = jarFileStatus.getModificationTime();
env.put(DshellDSConstants.DISTRIBUTEDJARLOCATION + i,
hdfsJarLocation);
env.put(DshellDSConstants.DISTRIBUTEDJARTIMESTAMP + i,
Long.toString(hdfsJarTimestamp));
env.put(DshellDSConstants.DISTRIBUTEDJARLEN + i,
Long.toString(hdfsJarLen));
}
}
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
// 上傳container_archives到HDFS↓↓↓↓↓↓↓↓↓↓↓↓↓↓
long hdfsArchiveLen;
String archivePathSuffix;
Path archiveDst;
FileStatus archiveFileStatus;
if (this.containerArchivePaths.length != 0) {
for (int i = 0; i < this.containerArchivePaths.length; i++) {
String hdfsArchiveLocation = "";
String[] archiveNameSplit = this.containerArchivePaths[i].split("/");
String archiveName = archiveNameSplit[(archiveNameSplit.length - 1)];
hdfsArchiveLen = 0L;
long hdfsArchiveTimestamp = 0L;
if (!this.containerArchivePaths[i].isEmpty()) {
Path archiveSrc = new Path(this.containerArchivePaths[i]);
archivePathSuffix = this.appName + "/" + appId.toString() +
"/" + archiveName;
archiveDst = new Path(fs.getHomeDirectory(),
archivePathSuffix);
fs.copyFromLocalFile(false, true, archiveSrc, archiveDst);
hdfsArchiveLocation = archiveDst.toUri().toString();
archiveFileStatus = fs.getFileStatus(archiveDst);
hdfsArchiveLen = archiveFileStatus.getLen();
hdfsArchiveTimestamp = archiveFileStatus
.getModificationTime();
env.put(DshellDSConstants.DISTRIBUTEDARCHIVELOCATION + i,
hdfsArchiveLocation);
env.put(DshellDSConstants.DISTRIBUTEDARCHIVETIMESTAMP + i,
Long.toString(hdfsArchiveTimestamp));
env.put(DshellDSConstants.DISTRIBUTEDARCHIVELEN + i,
Long.toString(hdfsArchiveLen));
}
}
}
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
4.2 ApplicationMaster類
4.2.1 ApplicationMaster源碼邏輯
一個ApplicationMaster將在啟動一個或過個container,在container上執行shell命令或腳本。ApplicationMaster運行邏輯為:
- ResourceManager啟動一個container用於運行ApplicationMaster。
- ApplicationMaster連接ResourceManager,向ResourceManager註冊自己。
- 向ResourceManager註冊的信息有:
- ApplicationMaster的ip:port
- ApplicationMaster所在主機的hostname
- ApplicationMaster的tracking url。客戶端可以用tracking url來跟蹤任務的狀態和歷史記錄。
- 需要註意的是:在DistributedShell中,不需要初註冊tracking url和 appMasterHost:appMasterRpcPort,只需要設置hostname。
- 向ResourceManager註冊的信息有:
- ApplicationMaster會按照設定的時間間隔向ResourceManager發送心跳。ResourceManager的ApplicationMasterService每次收到ApplicationMaster的心跳信息後,會同時在AMLivelinessMonitor更新其最近一次發送心跳的時間。
- ApplicationMaster通過ContainerRequest方法向ResourceManager發送請求,申請相應數目的container。在發送申請container請求前,需要初始化Request,需要初始化的參數有:
- Priority:請求的優先順序
- capability:當前支持CPU和Memory
- nodes:申請的container所在的host(如果不需要指定,則設為null)
- racks:申請的container所在的rack(如果不需要指定,則設為null)
- ResourceManager返回ApplicationMaster的申請的containers信息,根據container的狀態-containerStatus,更新已申請成功和還未申請的container數目。
- 申請成功的container,ApplicationMaster則通過ContainerLaunchContext初始化container的啟動信息。初始化container後啟動container。需要初始化的信息有:
- Container id
- 執行資源(Shell腳本或命令、處理的數據)
- 運行環境
- 運行命令
- container運行期間,ApplicationMaster對container進行監控。
- job運行結束,ApplicationMaster發送FinishApplicationMasterRequest請求給ResourceManager,完成ApplicationMaster的註銷。
具體代碼如下(基於YARN2.6.0):
- ApplicationMaster的入口main方法:
public static void main(String[] args) {
boolean result = false;
try {
DshellApplicationMaster appMaster = new DshellApplicationMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
appMaster.run();
result = appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
ExitUtil.terminate(1, t);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
}
main方法:
- 輸入參數為Client提交的執行命令。
- init方法完成對執行命令的解析,獲取執行命令中參數指定的值。
- run方法完成ApplicationMaster的啟動、註冊、containers的申請、分配、監控等功能的啟動。
- run方法中建立了與ResourceManager通信的Handle-AMRMClientAsync,其中的CallbackHandler是由RMCallbackHandler類實現的。
- RMCallbackHandler類中實現了containers的申請、分配等方法。
- containers的分配方法onContainersAllocated中通過LaunchContainerRunnable類中run方法完成container的啟動。
- run方法中建立了與ResourceManager通信的Handle-AMRMClientAsync,其中的CallbackHandler是由RMCallbackHandler類實現的。
- finish方法完成container的停止、ApplicationMaster的註銷。
4.2.2 對ApplicationMaster源碼的修改
在原有YARN DistributedShell的基礎上做的修改如下:
- 在ApplicationMaster初試化時,增加對
container_files
和container_archives
兩個參數指定值的支持。即:初始化container_files
和container_archives
指定的運行資源在HDFS上的信息。 - 在container運行時,從HDFS上載入
container_files
和container_archives
指定的資源。
對ApplicationMaster源碼修改如下:
- 變數
- 增加變數,用於保存
container_files
和container_archives
指定的運行資源在HDFS上的信息。
- 增加變數,用於保存
// 增加container_files、container_archives選項值變數 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
private ArrayList<DshellFile> scistorJars = new ArrayList();
private ArrayList<DshellArchive> scistorArchives = new ArrayList();
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
- ApplicationMaster的init方法
- 初始化
container_files
和container_archives
兩個參數指定值信息。
- 初始化
// 遍歷envs,把所有的jars、archivers的HDFS路徑,時間戳,LEN全部保存到jarPaths對象數組中 ↓↓↓↓↓↓↓↓↓↓
for (String key : envs.keySet()) {
if (key.contains(DshellDSConstants.DISTRIBUTEDJARLOCATION)) {
DshellFile scistorJar = new DshellFile();
scistorJar.setJarPath((String) envs.get(key));
String num = key
.split(DshellDSConstants.DISTRIBUTEDJARLOCATION)[1];
scistorJar.setTimestamp(Long.valueOf(Long.parseLong(
(String) envs
.get(DshellDSConstants.DISTRIBUTEDJARTIMESTAMP + num))));
scistorJar.setSize(Long.valueOf(Long.parseLong(
(String) envs
.get(DshellDSConstants.DISTRIBUTEDJARLEN + num))));
this.scistorJars.add(scistorJar);
}
}
for (String key : envs.keySet()) {
if (key.contains(DshellDSConstants.DISTRIBUTEDARCHIVELOCATION)) {
DshellArchive scistorArchive = new DshellArchive();
scistorArchive.setArchivePath((String) envs.get(key));
String num = key
.split(DshellDSConstants.DISTRIBUTEDARCHIVELOCATION)[1];
scistorArchive.setTimestamp(Long.valueOf(Long.parseLong(
(String) envs
.get(DshellDSConstants.DISTRIBUTEDARCHIVETIMESTAMP +
num))));
scistorArchive.setSize(Long.valueOf(Long.parseLong(
(String) envs
.get(DshellDSConstants.DISTRIBUTEDARCHIVELEN + num))));
this.scistorArchives.add(scistorArchive);
}
}
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
- LaunchContainerRunnable的run方法(container線程的run方法)
- 從HDFS上載入
container_files
和container_archives
指定的資源。
- 從HDFS上載入
// 把HDFS中的jar、archive載入到container的LocalResources,也就是從HDFS分發到container節點的過程 ↓↓↓↓↓↓↓↓↓↓↓↓↓
for (DshellFile perJar : DshellApplicationMaster.this.scistorJars) {
LocalResource jarRsrc = (LocalResource) Records.newRecord(LocalResource.class);
jarRsrc.setType(LocalResourceType.FILE);
jarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
jarRsrc.setResource(
ConverterUtils.getYarnUrlFromURI(new URI(perJar.getJarPath()
.toString())));
} catch (URISyntaxException e1) {
DshellApplicationMaster.LOG.error("Error when trying to use JAR path specified in env, path=" +
perJar.getJarPath(), e1);
DshellApplicationMaster.this.numCompletedContainers.incrementAndGet();
DshellApplicationMaster.this.numFailedContainers.incrementAndGet();
return;
}
jarRsrc.setTimestamp(perJar.getTimestamp().longValue());
jarRsrc.setSize(perJar.getSize().longValue());
String[] tmp = perJar.getJarPath().split("/");
localResources.put(tmp[(tmp.length - 1)], jarRsrc);
}
String[] tmp;
for (DshellArchive perArchive : DshellApplicationMaster.this.scistorArchives) {
LocalResource archiveRsrc =
(LocalResource) Records.newRecord(LocalResource.class);
archiveRsrc.setType(LocalResourceType.ARCHIVE);
archiveRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
archiveRsrc.setResource(
ConverterUtils.getYarnUrlFromURI(new URI(perArchive
.getArchivePath().toString())));
} catch (URISyntaxException e1) {
DshellApplicationMaster.LOG.error("Error when trying to use ARCHIVE path specified in env, path=" +
perArchive.getArchivePath(),
e1);
DshellApplicationMaster.this.numCompletedContainers.incrementAndGet();
DshellApplicationMaster.this.numFailedContainers.incrementAndGet();
return;
}
archiveRsrc.setTimestamp(perArchive.getTimestamp().longValue());
archiveRsrc.setSize(perArchive.getSize().longValue());
tmp = perArchive.getArchivePath().split("/");
String[] tmptmp = tmp[(tmp.length - 1)].split("[.]");
localResources.put(tmptmp[0], archiveRsrc);
}
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
4.3 DSConstants類
DSConstants類中是在Client和ApplicationMaster中的常量,對DSConstants類的修改為:增加了container_files、container_archives相關常量。修改代碼如下:
// 增加container_files、container_archives相關常量 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
public static final String DISTRIBUTEDJARLOCATION = "DISTRIBUTEDJARLOCATION";
public static final String DISTRIBUTEDJARTIMESTAMP = "DISTRIBUTEDJARTIMESTAMP";
public static final String DISTRIBUTEDJARLEN = "DISTRIBUTEDJARLEN";
public static final String DISTRIBUTEDARCHIVELOCATION = "DISTRIBUTEDARCHIVELOCATION";
public static final String DISTRIBUTEDARCHIVETIMESTAMP = "DISTRIBUTEDARCHIVETIMESTAMP";
public static final String DISTRIBUTEDARCHIVELEN = "DISTRIBUTEDARCHIVELEN";
// ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
4.4 Log4jPropertyHelper類
對Log4jPropertyHelper類無任何改動。