hadoop2.7.2 MapReduce Job提交源碼及切片源碼分析 1. 首先從 函數進入 2. 進入 方法 3. 進入 方法 MapReduce作業提交時連接集群通過Job的Connect方法實現,它實際上是構造集群Cluster實例cluster cluster是連接MapReduce集群 ...
hadoop2.7.2 MapReduce Job提交源碼及切片源碼分析
- 首先從
waitForCompletion
函數進入
boolean result = job.waitForCompletion(true);
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
// 首先判斷state,當state為DEFINE時可以提交,進入 submit() 方法
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
- 進入
submit()
方法
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 確認JobState狀態為可提交狀態,否則不能提交
ensureState(JobState.DEFINE);
// 設置使用最新的API
setUseNewAPI();
// 進入connect()方法,MapReduce作業提交時連接集群是通過Job類的connect()方法實現的,
// 它實際上是構造集群Cluster實例cluster
connect();
// connect()方法執行完之後,定義提交者submitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 這裡的核心方法是submitJobInternal(),顧名思義,提交job的內部方法,實現了提交job的所有業務邏輯
// 進入submitJobInternal
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 提交之後state狀態改變
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
- 進入
connect()
方法
- MapReduce作業提交時連接集群通過Job的Connect方法實現,它實際上是構造集群Cluster實例cluster
- cluster是連接MapReduce集群的一種工具,提供了獲取MapReduce集群信息的方法
- 在Cluster內部,有一個與集群進行通信的客戶端通信協議ClientProtocol的實例client,它由ClientProtocolProvider的靜態create()方法構造
- 在create內部,Hadoop2.x中提供了兩種模式的ClientProtocol,分別為Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster實際上是由它們負責與集群進行通信的
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {// cluster提供了遠程獲取MapReduce的方法
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
// 只需關註這個Cluster()構造器,構造集群cluster實例
return new Cluster(getConfiguration());
}
});
}
}
- 進入
Cluster()
構造器
// 首先調用一個參數的構造器,間接調用兩個參數的構造器
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
// 最重要的initialize方法
initialize(jobTrackAddr, conf);
}
// cluster中要關註的兩個成員變數是客戶端通訊協議提供者ClientProtocolProvider和客戶端通訊協議ClientProtocol實例client
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
// 如果配置文件沒有配置YARN信息,則構建LocalRunner,MR任務本地運行
// 如果配置文件有配置YARN信息,則構建YarnRunner,MR任務在YARN集群上運行
if (jobTrackAddr == null) {
// 客戶端通訊協議client是調用ClientProtocolProvider的create()方法實現
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
- 進入
submitJobInternal()
,job的內部提交方法,用於提交job到集群
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
// 檢查結果的輸出路徑是否已經存在,如果存在會報異常
checkSpecs(job);
// conf裡邊是集群的xml配置文件信息
Configuration conf = job.getConfiguration();
// 添加MR框架到分散式緩存中
addMRFrameworkToDistributedCache(conf);
// 獲取提交執行時相關資源的臨時存放路徑
// 參數未配置時預設是(工作空間根目錄下的)/tmp/hadoop-yarn/staging/提交作業用戶名/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {//記錄提交作業的主機IP、主機名,並且設置配置信息conf
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 獲取JobId
JobID jobId = submitClient.getNewJobID();
// 設置jobId
job.setJobID(jobId);
// 提交作業的路徑Path(Path parent, String child),會將兩個參數拼接為一個路徑
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// job的狀態
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
// 拷貝jar包到集群
// 此方法中調用如下方法:rUploader.uploadFiles(job, jobSubmitDir);
// uploadFiles方法將jar包拷貝到集群
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 計算切片,生成切片規劃文件
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
// 開始正式提交job
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
- 進入
writeSplits(job, submitJobDir)
,計算切片,生成切片規劃文件
- 內部會調用
writeNewSplits(job, jobSubmitDir)
方法 writeNewSplits(job, jobSubmitDir)
內部定義了一個InputFormat
類型的實例input- InputFormat主要作用:
- 驗證job的輸入規範
- 對輸入的文件進行切分,形成多個InputSplit(切片)文件,每一個InputSplit對應著一個map任務(MapTask)
- 將切片後的數據按照規則形成key,value鍵值對RecordReader
- input調用getSplits()方法:
List<InputSplit> splits = input.getSplits(job);
- 進入FileInputFormat類下的
getSplits(job)
方法
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
// getFormatMinSplitSize()返回值固定為1,getMinSplitSize(job)返回job大小
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// getMaxSplitSize(job)返回Lang類型的最大值
long maxSize = getMaxSplitSize(job);
// generate splits 生成切片
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
// 遍歷job下的所有文件
for (FileStatus file: files) {
// 獲取文件路徑
Path path = file.getPath();
// 獲取文件大小
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判斷是否可分割
if (isSplitable(job, path)) {
// 獲取塊大小
// 本地環境塊大小預設為32MB,YARN環境在hadoop2.x新版本為128MB,舊版本為64MB
long blockSize = file.getBlockSize();
// 計算切片的邏輯大小,預設等於塊大小
// 返回值為:return Math.max(minSize, Math.min(maxSize, blockSize));
// 其中minSize=1, maxSize=Long類型最大值, blockSize為切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// 每次切片時就要判斷切片剩下的部分是否大於切片大小的SPLIT_SLOP(預設為1.1)倍,
// 否則就不再切分,劃為一塊
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}