用戶提交 MapReduce 作業後,JobClient 會調用 InputFormat 的 getSplit方法 生成 InputSplit 的信息。 一個 MapReduce 任務可以有多個 Split,其用於分割用戶的數據源,根據用戶設定的切割大小把數據源切割成 InputSplit元數據和 ...
用戶提交 MapReduce 作業後,JobClient 會調用 InputFormat 的 getSplit方法 生成 InputSplit 的信息。 一個 MapReduce 任務可以有多個 Split,其用於分割用戶的數據源,根據用戶設定的切割大小把數據源切割成 InputSplit元數據和 InputSplit原始數據。 元數據的作用:被JobTracker使用,生成Task的本地行的數據結構。 原始數據的作用:被Map Task初始化時使用,用來獲取要處理的數據。 以下開始對 class JobSplit 類進行分析: 一開始就載入meta的頭信息,主要用於構成Task列表的HEAD信息
static { try { META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8"); } catch (UnsupportedEncodingException u) { throw new RuntimeException(u); } }2、New一個用於保存InputSplit元信息的數據結構
public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT = new TaskSplitMetaInfo();JobSplit 封裝了讀寫InputSplit相關的基礎類。
![](http://images2017.cnblogs.com/blog/1209698/201710/1209698-20171015125232699-348669111.png)
private long startOffset:該InputSplit在job.split文件中的偏移量 private long inputDataLength:該 InputSplit 的長度 private String[] locations:該 InputSplit 所在的host 列表從這三個屬性可以使TaskTracker知道從哪裡讀取對應的元數據並得到真正的原始數據來處理。
public void readFields(DataInput in) throws IOException { int len = WritableUtils.readVInt(in); locations = new String[len]; for (int i = 0; i < locations.length; i++) { locations[i] = Text.readString(in); } startOffset = WritableUtils.readVLong(in); inputDataLength = WritableUtils.readVLong(in); } public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, locations.length); for (int i = 0; i < locations.length; i++) { Text.writeString(out, locations[i]); } WritableUtils.writeVLong(out, startOffset); WritableUtils.writeVLong(out, inputDataLength); }在分析這兩個函數之前,先簡單複習序列化和反序列化的定義: 把對象轉換為位元組序列的過程稱為對象的序列化; 把位元組序列恢復為對象的過程稱為對象的反序列化。 對象的序列化主要有兩種用途:1)把對象的位元組序列永久地存儲在硬碟上,一般是文件。2)在網路上傳送對象的位元組序列。 接著通過查看源碼,很容易會發現 public static class SplitMetaInfo implements Writable 繼承了 Writable 介面,並且進入Writable 類查看得知,只要重寫 Writable 就能我們自己自定義 Split 出 InputSplit的格式。
public interface Writable { /** * Serialize the fields of this object to <code>out</code>. * * @param out <code>DataOuput</code> to serialize this object into. * @throws IOException */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param in <code>DataInput</code> to deseriablize this object from. * @throws IOException */ void readFields(DataInput in) throws IOException;下麵再來看看 WritableUtils (可寫的工具類)這個類。
![](http://images2017.cnblogs.com/blog/1209698/201710/1209698-20171015125242965-2012883450.png)
private TaskSplitIndex splitIndex:Split元信息在 jib.split 文件中的位置 private long inputDataLength:InputSplit的數據長度 private String[] locations:InputSplit所在的host列表這三個信息是在作業初始化時,JobTracker從文件 job. splitmetainfo 文件獲得的。其中,host列表信息是任務調度判斷任務是否在本地的最重要因素。為什麼需要這個?一切是為了提高效率,節省集群的資源開銷。因為在集群中,為了容災容錯,數據一般是有多份備份的,每次TaskTracker要獲取數據處理時,為了提高工作效率,都是儘可能的從本地獲取數據,如果本地沒有想要的數據備份時才會從本地機架的不同節點獲取,再或者從不同機架的節點獲取數據。 3、TaskSplitIndex 用於在JobTracker向TaskTracker分配新任務時, 指定新任務待處理數據位置信息在文件 jib.split中的索引。 其包括兩個屬性:
private String splitLocation:job.split文件的位置 private long startOffset:InputSplit在 job.split 文件中的位置
public void readFields(DataInput in) throws IOException { splitLocation = Text.readString(in); startOffset = WritableUtils.readVLong(in); } public void write(DataOutput out) throws IOException { Text.writeString(out, splitLocation); WritableUtils.writeVLong(out, startOffset); }最後,JobSplit 包含的三個與Split相關的基礎類,規定瞭如何Split出元數據和原始數據,並且構造了一個Task Split的存儲列表供TaskTracker查詢,因此知道從哪裡得到數據來處理。