通過查網上資料和鑽研HDFS源碼,理清HDFS的寫入機制和流程 ...
說明
除了標註之外,本文純屬原創,轉載請註明出處:https://www.jianshu.com/p/ea6ef5f5b868, https://www.cnblogs.com/monkeyteng/p/10220395.html
HDFS架構簡介
Hadoop的框架最核心的設計就是:HDFS和MapReduce。HDFS為海量的數據提供了存儲,則MapReduce為海量的數據提供了計算。本文基於Hadoop 2.7.3源碼,分析本地文件推送(新建/追加)到的HDFS客戶端邏輯。
- HDFS架構主要包含兩種類型的節點:NameNode和DataNode。
- NameNode,其實就是名位元組點,其功能類似於我們常用的磁碟文件系統中的inode。對於HDFS而言,NameNode相當於“目錄管理器”和“inode表”。
- NameNode保存兩類關鍵的映射表:
- 名字空間表:從文件名到數據塊(DataBlock)的映射,這部分數據保存在NameNode伺服器的磁碟。
- inode表:從數據塊(DataBlock)到機器的映射,包括每一個數據塊保存在哪一個或者哪幾個機器上。這部分數據在每次重啟NameNode的時候都會和DataNode通訊並重建。
- 對於Hadoop 2.7.3而言,一個DataBlock預設是128MB,所以一個文件可能需要N個DataBlock來存儲,那麼名字空間表很可能是一個文件名映射到一個DataBlock的數組。
- 關於這兩張表如何協作定位文件:
- 當使用文件名訪問文件時,NameNode會查詢名字空間表,根據這個文件名獲取它所有內容對應的DataBlock列表(是不是很類似於單機磁碟的數據訪問)。此時inode表會查詢每一個DataBlock的信息,包括它所在的位置(DataNode的IP+埠)、DataBlock的ID和時間戳以及裡面數據的長度(<=128MB)等。
- 這個DataBlock列表返回到客戶端,客戶端根據每個DataBlock上的信息(線索),分別連接到每個DataNode上,獲取上面存儲的數據。
- 客戶端與NameNode、NameNode與DataNode的連接,全部都是通過ProtoBuf的RPC調用來實現的。關於ProtoBuf可以參考這裡。例如,下麵就是追加文件的append請求的RPC協議:
//摘自hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
//RPC請求
rpc append(AppendRequestProto) returns(AppendResponseProto);
//請求報文
message AppendRequestProto {
required string src = 1;
required string clientName = 2;
optional uint32 flag = 3; // bits set using CreateFlag
}
//應答報文
message AppendResponseProto {
optional LocatedBlockProto block = 1;
optional HdfsFileStatusProto stat = 2;
}
HDFS寫文件Pipeline機制
HDFS在對文件的寫入方面,只允許數據追加到文件末尾,而不允許在文件中間修改文件。因為在文件中間修改文件,需要涉及文件鎖、數據塊之類的比較複雜的邏輯。
Hadoop的文件按照DataBlock分塊,並以DataBlock為單位做冗餘(負載均衡)。HDFS可以指定一個複製因數(replication),預設是保存3份,根據dfs.replication
配置項配置。
下麵分析HDFS寫文件的Pipeline流程(藍色線表示用於通訊,紅色線表示數據的傳輸路線):
- ①客戶端發送請求到NameNode,請求寫文件/新建數據塊。
- NameNode收到請求後,會給客戶端分配一個數據塊,其ID是
blk_123456
,並指明DataBlock各個拷貝所在的各個DataNode的IP和埠(圖中是分別存在於三個DataNode中)。 - 這一系列的DataNode稱為Pipeline,也就是數據傳輸的管道,也就是【DataNode_1:50010, DataNode_2:50010, DataNode_3:50010】。
- ②客戶端收到數據塊的信息,開始對DataNode發起寫的請求,請求報文包括要寫的數據塊,要寫的數據大小等等。請求成功後,發送數據到第一個DataNode,也就是圖中的DataNode_1,在該請求中包含DataBlock各個拷貝的地址(包含DataNode2和DataNode3的地址):【DataNode_1:50010, DataNode_2:50010, DataNode_3:50010】,發送完成之後等待DataNode_1返回的ACK報文。
- ③DataNode_1收到數據後,保存數據,並把數據發送到DataNode_2,Pipeline修改為【DataNode_2:50010, DataNode_3:50010】,發送完成之後等待DataNode_2返回的ACK報文。
- ④DataNode_2收到數據後,把數據發送到DataNode_3,Pipeline修改為【DataNode_3:50010】,發送完成之後等待DataNode_2返回的ACK報文。
- ⑤DataNode_3發現Pipeline中只有自己,不再有下游的DataNode節點,於是處理完成之後只需要返回ACK到Pipeline的上游節點,即DataNode_2。
- ⑥DataNode_2收到DataNode_3的ACK,於是把ACK發送到Pipeline的上游節點,即DataNode_1。
- ⑦DataNode_1收到DataNode_2的ACK,把ACK發送到Pipeline的上游節點,即客戶端。
數據發送至此完成。
HDFS文件推送客戶端
要把本地文件推送到HDFS,可以通過以下兩個命令實現:
hadoop fs -appendToFile <localsrc> ... <dst>
hadoop fs -put [-f] [-p] [-l] <localsrc> ... <dst>
跟蹤調用堆棧發現,這兩個命令最終是調用DFSOutputStream.java
中的代碼實現文件的拷貝。
輔助發送的相關類和數據結構
這份代碼裡面包含了一些用於輔助發送的類:
DFSOutputStream
:實現了發送數據的主流程,最主要是繼承自FSOutputSummer
這個虛擬類的介面方法writeChunk
。DataStreamer
:繼承自Daemon
的後臺線程,主要實現數據的流式發送。ResponseProcessor
:同樣繼承自Daemon
的後臺線程,主要實現對已發送數據包的ACK報文的接收。
還有一些保存發送數據相關信息的數據結構:
DFSPacket
:表示發送出去的一個數據包,包含相應的請求頭部以及相關標誌位。LinkedList<DFSPacket> dataQueue
:用於保存待發送的數據包。它是主線程*DFSOutputStream
和發送線程DataStreamer
之間生產者-消費者關係的共用數據結構。LinkedList<DFSPacket> ackQueue
:用於保存已經發送的數據包。發出去的數據包還要等待DataNode返回ACK才可以被認為是發送成功。它是發送線程DataStreamer
與ACK接收線程ResponseProcessor
之間生產者-消費者關係的共用數據結構。BlockConstructionStage stage
:這是一個狀態變數,整個發送流程就相當於一個狀態機。
看完上面的數據結構,整個數據發送流程就很明顯了:
DFSOutputStream
把數據組裝成DFSPacket
對象,放入dataQueue
;然後等待發送線程DataStreamer
發送到DataNode;DataStreamer
發送之後,把DFSPacket
對象移動到ackQueue
,等待ACK線程ResponseProcessor
在收到對應的ACK之後把該DFSPacket
從隊列移除。
下麵主要分析DFSOutputStream.java
這個客戶端代碼的執行流程。
#### 數據發送的主要流程
newStreamForCreate/newStreamForAppend
這兩個靜態函數用於創建DFSOutputStream
對象。一個是用於新建文件,一個用於追加到現有的文件。兩個函數主要差別在於,前者需要新建一個文件(發送create的RPC請求到NameNode),後者直接通過發送append的RPC請求到NameNode,在返回報文中獲取文件最後的一個數據塊並開始寫入。newStreamForCreate/newStreamForAppend
這兩個函數返回一個DFSOutputStream
的對象,然後被org.apache.hadoop.io.IOUtils.copyBytes()
調用DFSOutputStream
的writeChunk
介面函數,把本地的數據塊發送出去。下麵主要看writeChunk
函數。這個函數的參數主要包括數據的緩衝區、要發送的數據在DataBlock中的offset、還有數據的校驗等。