MapReduce之Job提交流程源碼和切片源碼分析

来源:https://www.cnblogs.com/kocdaniel/archive/2019/09/29/11609404.html
-Advertisement-
Play Games

hadoop2.7.2 MapReduce Job提交源碼及切片源碼分析 1. 首先從 函數進入 2. 進入 方法 3. 進入 方法 MapReduce作業提交時連接集群通過Job的Connect方法實現,它實際上是構造集群Cluster實例cluster cluster是連接MapReduce集群 ...


hadoop2.7.2 MapReduce Job提交源碼及切片源碼分析

  1. 首先從waitForCompletion函數進入
boolean result = job.waitForCompletion(true);
/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    // 首先判斷state,當state為DEFINE時可以提交,進入 submit() 方法
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
  1. 進入submit()方法
/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    // 確認JobState狀態為可提交狀態,否則不能提交
    ensureState(JobState.DEFINE);
    // 設置使用最新的API
    setUseNewAPI();
    // 進入connect()方法,MapReduce作業提交時連接集群是通過Job類的connect()方法實現的,
    // 它實際上是構造集群Cluster實例cluster
    connect();
    // connect()方法執行完之後,定義提交者submitter
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        // 這裡的核心方法是submitJobInternal(),顧名思義,提交job的內部方法,實現了提交job的所有業務邏輯
          // 進入submitJobInternal
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    // 提交之後state狀態改變
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  1. 進入connect()方法
  • MapReduce作業提交時連接集群通過Job的Connect方法實現,它實際上是構造集群Cluster實例cluster
  • cluster是連接MapReduce集群的一種工具,提供了獲取MapReduce集群信息的方法
  • 在Cluster內部,有一個與集群進行通信的客戶端通信協議ClientProtocol的實例client,它由ClientProtocolProvider的靜態create()方法構造
  • 在create內部,Hadoop2.x中提供了兩種模式的ClientProtocol,分別為Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster實際上是由它們負責與集群進行通信的
  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {// cluster提供了遠程獲取MapReduce的方法
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     // 只需關註這個Cluster()構造器,構造集群cluster實例
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }
  1. 進入Cluster()構造器
// 首先調用一個參數的構造器,間接調用兩個參數的構造器
public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    // 最重要的initialize方法
    initialize(jobTrackAddr, conf);
  }
  
// cluster中要關註的兩個成員變數是客戶端通訊協議提供者ClientProtocolProvider和客戶端通訊協議ClientProtocol實例client
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          // 如果配置文件沒有配置YARN信息,則構建LocalRunner,MR任務本地運行
          // 如果配置文件有配置YARN信息,則構建YarnRunner,MR任務在YARN集群上運行
          if (jobTrackAddr == null) {
            // 客戶端通訊協議client是調用ClientProtocolProvider的create()方法實現
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: ", e);
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }
  1. 進入submitJobInternal(),job的內部提交方法,用於提交job到集群
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    // 檢查結果的輸出路徑是否已經存在,如果存在會報異常
    checkSpecs(job);

    // conf裡邊是集群的xml配置文件信息
    Configuration conf = job.getConfiguration();
    // 添加MR框架到分散式緩存中
    addMRFrameworkToDistributedCache(conf);

    // 獲取提交執行時相關資源的臨時存放路徑
    // 參數未配置時預設是(工作空間根目錄下的)/tmp/hadoop-yarn/staging/提交作業用戶名/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {//記錄提交作業的主機IP、主機名,並且設置配置信息conf
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 獲取JobId
    JobID jobId = submitClient.getNewJobID();
    // 設置jobId
    job.setJobID(jobId);
    // 提交作業的路徑Path(Path parent, String child),會將兩個參數拼接為一個路徑
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // job的狀態
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      // 拷貝jar包到集群
      // 此方法中調用如下方法:rUploader.uploadFiles(job, jobSubmitDir);
      // uploadFiles方法將jar包拷貝到集群
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      // 計算切片,生成切片規劃文件
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      // 開始正式提交job
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }
  1. 進入writeSplits(job, submitJobDir),計算切片,生成切片規劃文件
  • 內部會調用writeNewSplits(job, jobSubmitDir)方法
  • writeNewSplits(job, jobSubmitDir)內部定義了一個InputFormat類型的實例input
  • InputFormat主要作用
    • 驗證job的輸入規範
    • 對輸入的文件進行切分,形成多個InputSplit(切片)文件,每一個InputSplit對應著一個map任務(MapTask)
    • 將切片後的數據按照規則形成key,value鍵值對RecordReader
  • input調用getSplits()方法:List<InputSplit> splits = input.getSplits(job);
  1. 進入FileInputFormat類下的getSplits(job)方法
/** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
      
    // getFormatMinSplitSize()返回值固定為1,getMinSplitSize(job)返回job大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // getMaxSplitSize(job)返回Lang類型的最大值
    long maxSize = getMaxSplitSize(job);

    // generate splits 生成切片
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    // 遍歷job下的所有文件
    for (FileStatus file: files) {
      // 獲取文件路徑
      Path path = file.getPath();
      // 獲取文件大小
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判斷是否可分割
        if (isSplitable(job, path)) {
          // 獲取塊大小
          // 本地環境塊大小預設為32MB,YARN環境在hadoop2.x新版本為128MB,舊版本為64MB
          long blockSize = file.getBlockSize();
          // 計算切片的邏輯大小,預設等於塊大小
          // 返回值為:return Math.max(minSize, Math.min(maxSize, blockSize));
          // 其中minSize=1, maxSize=Long類型最大值, blockSize為切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          // 每次切片時就要判斷切片剩下的部分是否大於切片大小的SPLIT_SLOP(預設為1.1)倍,
          // 否則就不再切分,劃為一塊
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }
歡迎關註下方公眾號,獲取更多文章信息

1


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 遇到這種問題的時候我們需要考慮的是防火牆規則,因為防火牆預設是禁止所有埠訪問的,所以我們需要添加一個訪問埠來連接MySQL。 命令如下: 允許某埠 firewall-cmd --zone=public --add-port=“MySQL埠號”/tcp --permanent 禁止某埠 fi ...
  • https://blog.csdn.net/guohaosun/article/details/81481848 使用yum安裝軟體的時候出現:Another app is currently holding the yum lock; 其實就是yum鎖,就是你的yum被其他APP占用了。解決辦法就 ...
  • 一、前言 1、在perf監控進程的系統調用時,會出現大量swapper進程 2、官方描述該進程是當CPU上沒有其他任務運行時,就會執行swapper。換句話說swapper意味著CPU啥事也沒乾,跑去休息去了 3、本文來觀察一下swapper在cpu上的表現 二、環境準備 | 組件 | 版本 | | ...
  • 1、安裝nodejs,下載地址,http://nodejs.cn/download/,安裝過程直接點擊下一步即可 安裝完成後cmd輸入npm -v 查看當前安裝的npm的版本,如下圖提示所示則表示安裝成功。 2、查看當前npm的配置 cmd輸入npm config list 或者npm config ...
  • 前面的前奏已經分析介紹了建立內核頁表相關變數的設置準備,接下來轉入正題分析內核頁表的建立。 建立內核頁表的關鍵函數init_mem_mapping(): init_mem_mapping()裡面關鍵操作有三個split_mem_range()、kernel_physical_mapping_init ...
  • # 安裝適用於arm64位的nodejs runtime v10.16.3 mkdir /runtimes cd /runtimes wget https://nodejs.org/dist/v10.16.3/node-v10.16.3-linux-arm64.tar.xz tar -xvJf no... ...
  • # 以下示例適用於x64位runtime v3.0.0 mkdir /runtimes cd /runtimes wget https://download.visualstudio.microsoft.com/download/pr/b0c44e05-b7a1-4221-94ec-a0c0d3a1... ...
  • 背景 歸檔的表在源庫和目標庫都要存在 pt archiver歸檔表的場景有:不刪原表數據,非批量插入目標庫;不刪原表數據,批量插入目標庫;非批量刪除原表數據,非批量插入目標庫;批量刪除原表數據,批量插入目標庫 版本 pt archiver version pt archiver 3.0.12 sel ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...