工作中發現在oozie中使用sqoop與在shell中直接調度sqoop性能上有很大的差異。為了更深入的探索其中的緣由,開始了oozie的源碼分析之路。今天第一天閱讀源碼,由於沒有編譯成功,不能運行測試用例,直接使用sublime肉眼閱讀,還是挺費勁的。 雖然流程還不是順暢,但是大體上的內容還算是了 ...
工作中發現在oozie中使用sqoop與在shell中直接調度sqoop性能上有很大的差異。為了更深入的探索其中的緣由,開始了oozie的源碼分析之路。今天第一天閱讀源碼,由於沒有編譯成功,不能運行測試用例,直接使用sublime肉眼閱讀,還是挺費勁的。
雖然流程還不是順暢,但是大體上的內容還算是瞭解了。
我這裡使用的是oozie4.2的版本,之前稍微看過4.3版本的,源碼上還是有一定的差異的。
看上面的圖,大致理解oozie的過程是:
- oozie cli提交任務
- oozie server創建一個對應任務的client
- client去提交相應的任務
oozie工程結構
最重要的就是三個:
- 1 client 這是任務提交的入口
- 2 core 這是oozie的核心(在3中好像拆分成了core和server)
- 3 distro 這裡保存了啟動腳本
尋找源碼入口
- 一種方式是直接以文件夾搜索main方法。
- 另一種是看它的啟動腳本。
在啟動腳本中oozie.cmd,有這樣一句:
%JAVA_BIN% %JAVA_PROPERTIES% -cp %OOZIECPPATH% org.apache.oozie.cli.OozieCLI %OOZIE_PROPERTIES%
可見,入口在org.apache.oozie.cli.OozieCLI
這個類中,那就從它開始吧。
sqoop作業的提交
首先是OozieCLI的入口main方法:
public static void main(String[] args) {
//oozie方法的入口
if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
}
System.exit(new OozieCLI().run(args));
}
前面是一些認證的東西,可以忽略,直接進入run方法:
public synchronized int run(String[] args) {
//保證clent僅啟動一次
if (used) {
throw new IllegalStateException("CLI instance already used");
}
used = true;
//創建參數解析器
final CLIParser parser = getCLIParser();
try {
final CLIParser.Command command = parser.parse(args);
String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION);
if (doAsUser != null) {
OozieClient.doAs(doAsUser, new Callable<Void>() {
@Override
public Void call() throws Exception {
processCommand(parser, command);
return null;
}
});
}
else {
processCommand(parser, command);
}
return 0;
}
...
}
主要的內容是在這個processCommand裡面,processCommand會根據命令調用相應的命令方法:
public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
if (command.getName().equals(HELP_CMD)) {
parser.showHelp(command.getCommandLine());
}
else if (command.getName().equals(JOB_CMD)) {
jobCommand(command.getCommandLine());
}
else if (command.getName().equals(JOBS_CMD)) {
jobsCommand(command.getCommandLine());
}
else if (command.getName().equals(ADMIN_CMD)) {
adminCommand(command.getCommandLine());
}
else if (command.getName().equals(VERSION_CMD)) {
versionCommand();
}
else if (command.getName().equals(VALIDATE_CMD)) {
validateCommand(command.getCommandLine());
}
else if (command.getName().equals(SLA_CMD)) {
slaCommand(command.getCommandLine());
}
else if (command.getName().equals(PIG_CMD)) {
scriptLanguageCommand(command.getCommandLine(), PIG_CMD);
}
else if (command.getName().equals(HIVE_CMD)) {
scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
}
else if (command.getName().equals(SQOOP_CMD)) {
sqoopCommand(command.getCommandLine());//我關註的sqoop在這裡
}
else if (command.getName().equals(INFO_CMD)) {
infoCommand(command.getCommandLine());
}
else if (command.getName().equals(MR_CMD)){
mrCommand(command.getCommandLine());
}
}
在sqoopCommand方法裡面,sqoop任務被提交:
private void sqoopCommand(CommandLine commandLine) throws IOException, OozieCLIException {
List<String> args = commandLine.getArgList();
if (args.size() > 0) {
// checking if args starts with -X (because CLIParser cannot check this)
if (!args.get(0).equals("-X")) {
throw new OozieCLIException("Unrecognized option: " + args.get(0) + " Expecting -X");
}
args.remove(0);
}
if (!commandLine.hasOption(SQOOP_COMMAND_OPTION)) {
throw new OozieCLIException("Need to specify -command");
}
if (!commandLine.hasOption(CONFIG_OPTION)) {
throw new OozieCLIException("Need to specify -config <configfile>");
}
try {
XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));
}
catch (OozieClientException ex) {
throw new OozieCLIException(ex.toString(), ex);
}
}
最重要的內容就在這幾行:
XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));
其中wc.submitSqoop提交了sqoop的任務。
後續問題
- 1 任務提交到了哪裡?
- 2 在提交任務的時候都做了很麽?
- 3 如何在mapreduce開啟一個新的sqoop的?
- 4 為什麼在yarn中可以同時看到兩個應用,一個oozie,一個是sqoop