YARN基礎庫是其他一切模塊的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,比如RPC庫等,但因為引入了很多新的軟體設計方式,所以它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和 ...
YARN基礎庫是其他一切模塊的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,比如RPC庫等,但因為引入了很多新的軟體設計方式,所以它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和狀態機等
目錄 一. 概述 二. Protocol Buffers 三. Apache Avro
四. 底層通信庫
五. 服務庫與事件庫 六. 狀態機庫
一. 概述 Yarn基礎庫是其他一切模塊的基礎,它的設計直接決定了Yarn的穩定性和擴展性 Yarn的基礎庫主要有 :
- Protocol Buffers : Protocol Buffers是Google開源的序列化庫,具有平臺無關,高性能,相容好等優點.Yarn將ProtocolBuffers用到RPC通信中,預設情況下,Yarn RPC中所有參數採用Protocol Buffers進行序列化/反序列化
- Apache Avro : Avro是Hadoop生態系統中的RPC框架,具有平臺無關,支持動態模式等優點,Avro的最初設計動機是解決Yarn RPC相容性和擴展性差等問題
- RPC庫 : Yarn採用MR1中的RPC庫,但其中採用的預設序列化方法被替換成了Protocol Buffers
- 服務庫和事件庫 : Yarn將所有的對象服務化,以便統一管理(創建,銷毀等),而服務之間則採用事件機制進行通信
- 狀態機庫 : 狀態機是一種表示有限個狀態以及在這些狀態之間的轉移和動作等行為的數學模型
- 平臺無關,語言無關
- 高性能,解析速度是XML的20 ~ 100倍
- 體積小,文件大小僅是XML的1/10 ~ 1/3
- 使用簡單
- 相容性好
- 豐富的數據結構類型
- 快速可壓縮的二進位數據形式
- 存儲持久數據的文件容器
- 提供遠程過程調用RPC
- 簡單的動態語言結合功能
- 支持動態模式。Avro不需要生成代碼,有利於搭建通用的數據處理系統,避免了代碼入侵
- 數據無需加標簽
- 無需手工分配的域標識
YARN提供的對外類是Yarn RPC,用戶只需使用該類便可以構建一個基於HadoopRPC且採用Protocol Buffers序列化框架的通信協議 五. 服務庫與事件庫 服務庫 對於生命周期較長的對象,YARN採用了基於服務的對象管理模型對其進行管理,該模型主要有以下幾個特點 :
- 將每個被服務化的對象分為4個狀態 : NOTINITED(被創建),INITED(已初始化),STARTED(已啟動),STOPPED(已停止)
- 任何服務狀態變化都可以觸發另外一些動作
- 可通過組合的方式對任意服務進行組合,以便進行統一管理
public class TaskEvent extends AbstractEvent<TaskEventType> { private String taskID; public TaskEvent (String taskID, TaskEventType type) { super(type); this.taskID = taskID; } public String getTaskID() { return taskID; } } // Task事件類型定義 public enum TaskEventType { T_KILL, T_SCHEDULE }(2) 定義Job事件
public class JobEvent extends AbstractEvent<JobEventType> { private String jobID; public JobEvent (String jobID, JobEventType type) { super(type); this.jobID = jobID; } } //Job事件類型定義 public enum JobEventType { JOB_KILL, JOB_INIT, JOB_START }(3) 事件調度器 定義一個中央非同步調度器,它接收Job和Task兩種類型事件,並交給對應的事件處理器處理
@SuppressWarnings("unchecked") public class SimpleMRAppMaster extends CompositeService { private Dispatcher dispatcher; //中央非同步調度器 private String jobID; private int taskNumber; //該作業包含的任務數目 private String[] taskIDs; //該作業內部包含的所有任務 public SimpleMRAppMaster (String name, String jobID, int taskNumber) { super(name); this.jobID = jobID; this.taskNumber = taskNumber; taskIDs = new String[taskNumber]; for (int i = 0; i < taskNumber; i++) { taskIDs[i] = new String (jobID + "_task_" + i); } } public void serviceInit (final Configuration conf) throws Exception { dispatcher = new AsyncDispatcher(); //定義一個中央非同步調度器 //分別註冊Job和Task事件調度器 dispatcher.register(JobEventType.class, new JobEventDispatcher()); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); addService((Service)dispatcher); super.serviceInit(conf); } public Dispatcher getDispatcher() { return dispatcher; } private class JobEventDispatcher implements EventHandler<JobEvent> { @Override public void handle (JobEvent event) { if (event.getType() == JobEventType.JOB_KILL) { System.out.println("Receive JOB_KILL event, killing all the tasks"); for (int i = 0; i < taskNumber; i++) { dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL)); } } else if (event.getType() == JobEventType.JOB_INIT) { System.out.println("Receive JOB_INIT event, scheduling tasks"); for (int i = 0; i < taskNumber; i++) { dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE)); } } } } private class TaskEventDispatcher implements EventHandler<TaskEvent> { @Override public void handler (TaskEvent event) { if (event.getType() == TaskEventType.T_KILL) { System.out.println("Receive T_KILL event of task" + event.getTaskID()); } else if (event.getType() == TaskEventType.T_SCHEDULE) { System.out.println("Receive T_SCHEDULE of task" + event.getTaskID()); } } } }(4). 測試程式
@SuppressWarnings("unchecked") public class SimpleMRAppMasterTest { public static void main (String[] args) throws Exception { String jobID = "job_20131215_12"; SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5); YarnConfiguration conf = new YarnConfiguration(new Configuration()); appMaster.serviceInit(conf); appMaster.serviceStart(); appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL)); appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT)); } }事件驅動帶來的變化 MRV1中,對象之間的作用關係是基於函數調用實現的,當一個對象向另外一個對象傳遞消息時,會直接採用函數調用的方式,且整個過程是串列的 基於函數調用的編程模型時低效的,它隱含著整個過程是串列、同步進行的。MRV2引入了事件驅動編程模型是一種更加高效的方式。 在基於事件驅動的編程模型中,所有對象被抽象成了事件處理器,而事件處理器之間通過事件相互關聯。 每種事件處理器處理一種類型的事件,同時根據需要觸發另外一種事件 相比於基於函數調用的編程模型,這種編程模型具有非同步、併發等特點,更加高效,因此更適合大型分散式系統 六. 狀態機庫 狀態機由一組狀態組成,這些狀態分為三類:初始狀態、中間狀態和最終狀態。狀態機從初始狀態開始運行,經過一系列中間狀態後,到達最終狀態並退出。在一個狀態機中,每個狀態都可以接收一組特定事件,並根據具體的事件類型轉換到另一個狀態。當狀態機轉換到最終狀態時,則退出 YARN狀態轉換方式 YARN中,每種狀態轉換由一個四元組表示,分別是轉換前狀態(preState)、轉換後狀態(postState)、事件(event)和回調函數(hook) YARN定義了三種狀態轉換方式 : (1) 一個初始狀態、一個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,併在執行完成後將當前狀態轉換為postState 初始狀態:最終狀態:事件=1:1:1 (2) 一個初始狀態、多個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,並將當前狀態轉移為Hook的返回值所表示的狀態 初始狀態:最終狀態:事件=1:N:1 (3) 一個初始狀態、一個最終狀態、多個事件。該方式表示狀態機在preState狀態下,接收到Event1、Event2和Event3中的任何一個事件,將執行函數狀態轉移函數Hook,併在執行完成後將當前狀態轉換成postState 初始狀態:最終狀態:事件=1:1:N 狀態機類 YARN自己實現了一個非常簡單的狀態機庫(位於包org.apache.hadcop.yarn.state中)。YARN對外提供了一個狀態機工廠StatemachineFactory,它提供多種addTransition方法供用戶添加各種狀態轉移,一旦狀態機添加完畢後,可通過調用installTopology完成一個狀態機的構建 我每天會寫文章記錄大數據技術學習之路,另外我自己整理了些大數據的學習資料,目前全部放在我的公眾號"SmallBird技術分享",加入我們一起學習交流,並且回覆'分享'會有大數據資源驚喜等著你~