Druid.io(以下簡稱Druid)是2013年底開源出來的, 主要解決的是對實時數據以及較近時間的歷史數據的多維查詢提供高併發(多用戶),低延時,高可靠性的問題。 Druid簡介: Druid是一個為在大數據集之上做實時統計分析而設計的開源數據存儲。這個系統集合了一個面向列存儲的層,一個分散式、 ...
Druid.io(以下簡稱Druid)是2013年底開源出來的, 主要解決的是對實時數據以及較近時間的歷史數據的多維查詢提供高併發(多用戶),低延時,高可靠性的問題。
Druid簡介:
- Druid是一個為在大數據集之上做實時統計分析而設計的開源數據存儲。這個系統集合了一個面向列存儲的層,一個分散式、shared-nothing的架構,和一個高級的索引結構,來達成在秒級以內對十億行級別的表進行任意的探索分析。
- 互聯網技術的快速增長催生了各類大體量的數據,Hadoop很大的貢獻在於幫助企業將他們那些低價值的事件流數據轉化為高價值的聚合數據,這適用於各種應用
- 但Hadoop擅長的是存儲和獲取大規模數據,但是它並不提供任何性能上的保證它能多快獲取到數據。此外,雖然Hadoop是一個高可用的系統,但是在高併發負載下性能會下降
- Hadoop是一個很好的後端、批量處理和數據倉庫系統。在一個需要高併發並且保證查詢性能和數據可用性的並需要提供產品級別的保證的需求,Hadoop並不能滿足,因此創建了Druid,一個開源的、分散式、列存儲、實時分析的數據存儲。在許多方面,Druid和其他OLAP系統有很多相似之處,互動式查詢系統,記憶體資料庫(MMDB),眾所周知的分散式數據存儲。其中的分散式和查詢模型都參考了當前的一些搜索引擎的基礎架構.
druid的一些特點:
- Druid是一個開源的,分散式的,列存儲的,適用於實時數據分析的系統,文檔詳細,易於上手,Druid的一些特性總結如下;
- Druid支持亞秒級的OLAP查詢分析,Druid採用了列式存儲/倒排索引/點陣圖索引等關鍵技術,能夠在亞秒級別內完成海量數據的過濾/聚合以及多位分析等操作。Druid使用Bitmap indexing加速column-store的查詢速度,使用了一個叫做CONCISE的演算法來對bitmap indexing進行壓縮,使得生成的segments比原始文本文件小很多
- Druid的高可用性和高擴展性,Druid採用分散式,SN(share-nothing)架構,管理類節點課配置HA,工作節點功能單一,不互相依賴,耦合性低,各種節點掛掉都不會使Druid停止工作,例如如果不需要streaming data ingestion完全可以忽略realtime node,這些都是的Druid在集群的管理,容災,容錯,擴容等方面變得非常容易;
- 實時流數據分析。區別於傳統分析型資料庫採用的批量導入數據進行分析的方式,Druid提供了實時流數據分析,採用LSM(Long structure merge)-Tree結構使Druid擁有極高的實時寫入性能;同時實現了實時數據在亞秒級內的可視化。
- 豐富的數據分析功能。針對不同用戶群體,Druid提供了友好的可視化界面、類SQL查詢語言以及REST 查詢介面。
- Druid的一些“局限”:
- Segment的不可修改性簡化了Druid的實現,但是如果你有修改數據的需求,必須重新創建segment,而bitmap indexing的過程是比較耗時的;
- Druid能接受的數據的格式相對簡單,比如不能處理嵌套結構的數據
Druid使用場景:
- 1:適用於清洗好的記錄實時錄入,但不需要更新操作
- 2:支持寬表,不用join的方式(換句話說就是一張單表)
- 3:可以總結出基礎的統計指標,可以用一個欄位表示
- 4:對時區和時間維度(year、month、week、day、hour等)要求高的(甚至到分鐘級別)
- 5:實時性很重要
- 6:對數據質量的敏感度不高
- 7:用於定位效果分析和策略決策參考
Druid本身包含5個組成部分:
Broker nodes, Historical nodes, Realtime nodes, Coordinator Nodes和indexing services. 分別的作用如下:
- Broker nodes: 負責響應外部的查詢請求,通過查詢Zookeeper將請求劃分成segments分別轉發給Historical和Real-time nodes,最終合併並返回查詢結果給外部;
- Historial nodes: 負責’Historical’ segments的存儲和查詢。其會從deep storage中load segments,並響應Broder nodes的請求。Historical nodes通常會在本機同步deep storage上的部分segments,所以即使deep storage不可訪問了,Historical nodes還是能serve其同步的segments的查詢;
- Real-time nodes: 用於存儲和查詢熱數據,會定期地將數據build成segments移到Historical nodes。一般會使用外部依賴kafka來提高realtime data ingestion的可用性。如果不需要實時ingest數據到cluter中,可以捨棄Real-time nodes,只定時地batch ingestion數據到deep storage;
- Coordinator nodes: 可以認為是Druid中的master,其通過Zookeeper管理Historical和Real-time nodes,且通過Mysql中的metadata管理Segments
- Druid中通常還會起一些indexing services用於數據導入,batch data和streaming data都可以通過給indexing services發請求來導入數據。
Broker node
- Broker節點扮演著歷史節點和實時節點的查詢路由的角色。
- Broker節點知道發佈於Zookeeper中的關於哪些segment是可查詢的和這些segment是保存在哪裡的,Broker節點就可以將到來的查詢請求路由到正確的歷史節點或者是實時節點,
- Broker節點也會將歷史節點和實時節點的局部結果進行合併,然後返回最終的合併後的結果給調用者
緩存:Broker節點包含一個支持LRU失效策略的緩存。這個緩存可以使用本地堆記憶體或者是一個外部的分散式 key/value 存儲,例如Memcached
- 每次Broker節點接收到查詢請求時,都會先將查詢映射到一組segment中去。這一組確定的segment的結果可能已經存在於緩存中,而不需要重新計算。
- 對於那些不存在於緩存的結果,Broker節點會將查詢轉發到正確的歷史節點和實時節點中去,一旦歷史節點返回結果,Broker節點會將這些結果緩存起來以供以後使用,這個過程如下圖所示
- 註意:實時數據永遠不會被緩存,因此查詢實時節點的數據的查詢請求總是會被轉發到實時節點上去。實時數據是不斷變化的,因此緩存實時數據是不可靠的
- 上圖:結果會為每一個segment緩存。查詢會合併緩存結果與歷史節點和實時節點的計算結果
- 緩存也可作為數據可用性的附加級別。在所有歷史節點都出現故障的情況下,對於那些命中已經在緩存中緩存了結果的查詢,仍然是可以返回查詢結果的
可用性:在所有的Zookeeper都中斷的情況下,數據仍然是可以查詢的。如果Broker節點不可以和Zookeeper進行通信了,它會使用它最後一次得到的整個集群的視圖來繼續將查詢請求轉發到歷史節點和實時節點,Broker節點假定集群的結構和Zookeeper中斷前是一致的。在實踐中,在我們診斷Zookeeper的故障的時候,這種可用性模型使得Druid集群可以繼續提供查詢服務,為我們爭取了更多的時間
說明:通常在ShareNothing的架構中,如果一個節點變得不可用了,會有一個服務將下線的這個節點的數據搬遷到其他節點,但是如果這個節點下線後又立即重啟,而如果服務在一下線的時候就開始搬遷數據,是會產生跨集群的數據傳輸,實際上是沒有必要的。因為分散式文件系統對同一份數據會有多個副本,搬遷數據實際上是為了滿足副本數.而下線又重啟的節點上的數據不會有什麼丟失的,因此短期的副本不足並不會影響整體的數據健康狀況.何況跨機器搬遷數據也需要一定的時間,何不如給定一段時間如果它真的死了,才開始搬遷
Historical node
- 歷史節點封裝了載入和處理由實時節點創建的不可變數據塊(segment)的功能。在很多現實世界的工作流程中,大部分導入到Druid集群中的數據都是不可變的,因此,歷史節點通常是Druid集群中的主要工作組件。
- 歷史節點遵循shared-nothing的架構,因此節點間沒有單點問題。節點間是相互獨立的並且提供的服務也是簡單的,它們只需要知道如何載入、刪除和處理不可變的segment (註:shared nothing architecture是一 種分散式計算架構,這種架構中不存在集中存儲的狀態,整個系統中沒有資源競爭,這種架構具有非常強的擴張性,在web應用中廣泛使用)
- 類似於實時節點,歷史節點在Zookeeper中通告它們的線上狀態和為哪些數據提供服務。載入和刪除segment的指令會通過Zookeeper來進行發佈,指令會包含segment保存在deep storage的什麼地方和怎麼解壓、處理這些segment的相關信息
- 在歷史節點從deep storage下載某一segment之前,它會先檢查本地緩存信息中看segment是否已經存在於節點中,如果segment還不存在緩存中,歷史節點會從deep storage中下載segment到本地
- 一旦處理完成,這個segment就會在Zookeeper中進行通告。此時,這個segment就可以被查詢了。歷史節點的本地緩存也支持歷史節點的快速更新和重啟,在啟動的時候,該節點會檢查它的緩存,併為任何它找到的數據立刻進行服務的提供,如下圖:
- 歷史節點從deep storage下載不可變的segment。segment在可以被查詢之前必須要先載入到記憶體中
- 歷史節點可以支持讀一致性,因為它們只處理不可變的數據。不可變的數據塊同時支持一個簡單的並行模型:歷史節點可以以非阻塞的方式併發地去掃描和聚合不可變的數據塊
Tiers: 歷史節點可以分組到不同的tier中,哪些節點會被分到一個tier中是可配置的。Tier的目的是可以根據segment的重要程度來分配高或低的優先順序來進行數據的分佈。
- 可以為不同的tier配置不同的性能和容錯參數。例如,可以使用一批很多個核的CPU和大容量記憶體的節點來組成一個“熱點數據”的tier,這個“熱點數據”集群可以配置來用於下載更多經常被查詢的數據。
- 一個類似的”冷數據”集群可以使用一些性能要差一些的硬體來創建,“冷數據”集群可以只包含一些不是經常訪問的segment
可用性: 歷史節點依賴於Zookeeper來管理segment的載入和卸載。
- 如果Zookeeper變得不可用的時候,歷史節點就不再可以為新的數據提供服務和卸載過期的數據,因為是通過HTTP來為查詢提供服務的
- 對於那些查詢它當前已經在提供服務的數據,歷史節點仍然可以進行響應。這意味著Zookeeper運行故障時不會影響那些已經存在於歷史節點的數據的可用性。
Coordinator node
- 主要負責數據的管理和在歷史節點上的分佈。協調節點告訴歷史節點載入新數據、卸載過期數據、複製數據、和為了負載均衡移動數據。
- Druid為了維持穩定的視圖,使用一個多版本的併發控制交換協議來管理不可變的segment。如果任何不可變的segment包含的數據已經被新的segment完全淘汰了,則過期的segment會從集群中卸載掉。
- 協調節點會經歷一個leader選舉的過程,來決定由一個獨立的節點來執行協調功能,其餘的協調節點則作為冗餘備份節點
- 協調節點會周期性(一分鐘)的執行來確定集群的當前狀態,它通過在運行的時候對比集群的預期狀態和集群的實際狀態來做決定。和所有的Druid節點一樣,協調節點維持一個和Zookeeper的連接來獲取當前集群的信息(數據拓撲圖、元信息庫中所有有效的Segment信息以及規則庫)
- 協調節點也維持一個與MySQL資料庫的連接,MySQL包含有更多的操作參數和配置信息。
- 其中一個存在於MySQL的關鍵信息就是歷史節點可以提供服務的所有segment的一個清單,這個表可以由任何可以創建segment的服務進行更新,例如實時節點。
- MySQL資料庫中還包含一個Rule表來控制集群中segment的是如何創建、銷毀和複製
Rules:Rules管理歷史segment是如何在集群中載入和卸載的。
- Rules指示segment應該如何分配到不同的歷史節點tier中,每一個tier中應該保存多少份segment的副本。
- Rules還可能指示segment何時應該從集群中完全地卸載。Rules通常設定為一段時間,例如,一個用戶可能使用Rules來將最近一個月的有價值的segment載入到一個“熱點數據”的集群中,最近一年的有價值的數據載入到一個“冷數據”的集群中,而將更早時間前的數據都卸載掉。
- 協調節點從MySQL資料庫中的rule表載入一組rules。Rules可能被指定到一個特定的數據源,或者配置一組預設的rules。協調節點會迴圈所有可用segment並會匹配第一條適用於它的rule
負載均衡:在典型的生產環境中,查詢通常命中數十甚至上百個segment,由於每個歷史節點的資源是有限的,segment必須被分佈到整個集群中,以確保集群的負載不會過於不平衡。
- 要確定最佳的負載分佈,需要對查詢模式和速度有一定的瞭解。通常,查詢會覆蓋一個獨立數據源中最近的一段鄰近時間的一批segment。平均來說,查詢更小的segment則更快
- 這些查詢模式提出以更高的比率對歷史segment進行複製,把大的segment以時間相近的形式分散到多個不同的歷史節點中,並且使存在於不同數據源的segment集中在一起
- 為了使集群中segment達到最佳的分佈和均衡,根據segment的數據源、新舊程度、和大小,開發了一個基於成本的優化程式
副本/複製(Replication):
- 協調節點可能會告訴不同的歷史節點載入同一個segment的副本。每一個歷史節點tier中副本的數量是完全可配置。
- 設置一個高級別容錯性的集群可以設置一個比較高數量的副本數。segment的副本被視為和原始segment一樣的,並使用相同的負載均衡演算法
- 通過複製segment,單一歷史節點故障對於整個Druid集群來說是透明的,不會有任何影響
可用性:
- 協調節點有Zookeeper和MySQL這兩個額外的依賴,協調節點依賴Zookeeper來確定集群中有哪些歷史節點
- 如果Zookeeper變為不可用,協調節點將不可以再進行segment的分配、均衡和卸載指令的發送。不過,這些都不會影響數據的可用性
- 對於MySQL和Zookeeper響應失效的設計原則是一致的:如果協調節點一個額外的依賴響應失敗了,集群會維持現狀
- Druid使用MySQL來存儲操作管理信息和關於segment如何存在於集群中的segment元數據。如果MySQL下線了,這些信息就在協調節點中變得不可用,不過這不代表數據不可用
- 如果協調節點不可以和MySQL進行通信,他們會停止分配新的segment和卸載過期的segment。在MySQL故障期間Broker節點、歷史節點、實時節點都是仍然可以查詢的
Realtime node
- 實時節點封裝了導入和查詢事件數據的功能,經由這些節點導入的事件數據可以立刻被查詢。
- 實時節點只關心一小段時間內的事件數據,並定期把這段時間內收集的這批不可變事件數據導入到Druid集群裡面另外一個專門負責處理不可變的批量數據的節點中去。
- 實時節點通過Zookeeper的協調和Druid集群的其他節點協調工作。實時節點通過Zookeeper來宣佈他們的線上狀態和他們提供的數據
- 實時節點為所有傳入的事件數據維持一個記憶體中的索引緩存, 隨著事件數據的傳入,這些索引會逐步遞增,並且這些索引是可以立即查詢的,查詢這些緩存於JVM的基於堆的緩存中的事件數據,Druid就表現得和行存儲一樣
- 為了避免堆溢出問題,實時節點會定期地、或者在達到設定的最大行限制的時候,把記憶體中的索引持久化到磁碟去
- 這個持久化進程會把保存於記憶體緩存中的數據轉換為基於列存儲的格式,所有持久化的索引都是不可變的,並且實時節點會載入這些索引到off-heap記憶體中使得它們可以繼續被查詢
- 上圖實時節點緩存事件數據到記憶體中的索引上,然後有規律的持久化到磁碟上。在轉移之前,持久化的索引會周期性地合併在一起。查詢會同時命中記憶體中的和已持久化的索引
- 所有的實時節點都會周期性的啟動後臺的計劃任務搜索本地的持久化索引,後臺計劃任務將這些持久化的索引合併到一起並生成一塊不可變的數據,這些數據塊包含了一段時間內的所有已經由實時節點導入的事件數據,我們稱這些數據塊為”Segment”。在傳送階段,實時節點將這些segment上傳到一個永久持久化的備份存儲中,通常是一個分散式文件系統,例如S3或者HDFS,Druid稱之為”Deep Storage”。
實時節點處理流程:導入、持久化、合併和傳送這些階段都是流動的,並且在這些處理階段中不會有任何數據的丟失,數據流圖如下:
-
- 節點啟動於13:47,並且只會接受當前小時和下一小時的事件數據。當事件數據開始導入後,節點會宣佈它為13:00到14:00這個時間段的Segment數據提供服務
- 每10分鐘(這個時間間隔是可配置的),節點會將記憶體中的緩存數據刷到磁碟中進行持久化,在當前小時快結束的時候,節點會準備接收14:00到15:00的事件數據,一旦這個情況發生了,節點會準備好為下一個小時提供服務,並且會建立一個新的記憶體中的索引。
- 隨後,節點宣佈它也為14:00到15:00這個時段提供一個segment服務。節點並不是馬上就合併13:00到14:00這個時段的持久化索引,而是會等待一個可配置的視窗時間,直到所有的13:00到14:00這個時間段的一些延遲數據的到來。這個視窗期的時間將事件數據因延遲而導致的數據丟失減低到最小。
- 在視窗期結束時,節點會合併13:00到14:00這個時段的所有持久化的索引合併到一個獨立的不可變的segment中,並將這個segment傳送走,一旦這個segment在Druid集群中的其他地方載入了並可以查詢了,實時節點會刷新它收集的13:00到14:00這個時段的數據的信息,並且宣佈取消為這些數據提供服務。
Indexing server
Indexing Service是負責“生產”Segment的高可用、分散式、Master/Slave架構服務。主要由三類組件構成:負責運行索引任務(indexing task)的Peon,負責控制Peon的MiddleManager,負責任務分發給MiddleManager的Overlord;三者的關係可以解釋為:Overlord是MiddleManager的Master,而MiddleManager又是Peon的Master。其中,Overlord和MiddleManager可以分散式部署,但是Peon和MiddleManager預設在同一臺機器上
Overlord
Overlord負責接受任務、協調任務的分配、創建任務鎖以及收集、返回任務運行狀態給調用者。當集群中有多個Overlord時,則通過選舉演算法產生Leader,其他Follower作為備份。
Overlord可以運行在local(預設)和remote兩種模式下,如果運行在local模式下,則Overlord也負責Peon的創建與運行工作,當運行在remote模式下時,Overlord和MiddleManager各司其職,根據圖3.6所示,Overlord接受實時/批量數據流產生的索引任務,將任務信息註冊到Zookeeper的/task目錄下所有線上的MiddleManager對應的目錄中,由MiddleManager去感知產生的新任務,同時每個索引任務的狀態又會由Peon定期同步到Zookeeper中/Status目錄,供Overlord感知當前所有索引任務的運行狀況。
Overlord對外提供可視化界面,通過訪問http://:/console.html,我們可以觀察到集群內目前正在運行的所有索引任務、可用的Peon以及近期Peon完成的所有成功或者失敗的索引任務。
MiddleManager
MiddleManager負責接收Overlord分配的索引任務,同時創建新的進程用於啟動Peon來執行索引任務,每一個MiddleManager可以運行多個Peon實例。
在運行MiddleManager實例的機器上,我們可以在${ java.io.tmpdir}目錄下觀察到以XXX_index_XXX開頭的目錄,每一個目錄都對應一個Peon實例;同時restore.json文件中保存著當前所有運行著的索引任務信息,一方面用於記錄任務狀態,另一方面如果MiddleManager崩潰,可以利用該文件重啟索引任務。
Peon
Peon是Indexing Service的最小工作單元,也是索引任務的具體執行者,所有當前正在運行的Peon任務都可以通過Overlord提供的web可視化界面進行訪問。
架構介紹:
- 查詢路徑:紅色箭頭:①客戶端向Broker發起請求,Broker會將請求路由到②實時節點和③歷史節點
- Druid數據流轉:黑色箭頭:數據源包括實時流和批量數據. ④實時流經過索引直接寫到實時節點,⑤批量數據通過IndexService存儲到DeepStorage,⑥再由歷史節點載入. ⑦實時節點也可以將數據轉存到DeepStorage
- Druid的集群依賴了ZooKeeper來維護數據拓撲. 每個組件都會與ZooKeeper交互,如下:
- 實時節點在轉存Segment到DeepStorage, 會寫入自己轉存了什麼Segment
- 協調節點管理歷史節點,它負責從ZooKeeper中獲取要同步/下載的Segment,並指派任務給具體的歷史節點去完成
- 歷史節點從ZooKeeper中領取任務,任務完成後要將ZooKeeper條目刪除表示完成了任務
- Broker節點根據ZooKeeper中的Segment所在的節點, 將查詢請求路由到指定的節點
- 對於一個查詢路由路徑,Broker只會將請求分發到實時節點和歷史節點, 因此元數據存儲和DeepStorage都不會參與查詢中(看做是後臺的進程).
MetaData Storage 與 Zookeeper
- MetaStore和ZooKeeper中保存的信息是不一樣的. ZooKeeper中保存的是Segment屬於哪些節點. 而MetaStore則是保存Segment的元數據信息
- 為了使得一個Segment存在於集群中,MetaStore存儲的記錄是關於Segment的自描述元數據: Segment的元數據,大小,所在的DeepStorage
- 元數據存儲的數據會被協調節點用來知道集群中可用的數據應該有哪些(Segment可以通過實時節點轉存或者批量數據直接寫入).
Druid還包含3個外部依賴,與其說是依賴,不如說正式Druid開放的架構,用戶可以根據自己的需求使用不同的外部組建
- Mysql:存儲Druid中的各種metadata(裡面的數據都是Druid自身創建和插入的),在Druid_0.9.1.1版本中,元信息庫druid主要包含十張表,均以“druid_”開頭,例如張表:”druid_config”(通常是空的), “druid_rules”(coordinator nodes使用的一些規則信息,比如哪個segment從哪個node去load)和“druid_segments”(存儲每個segment的metadata信息);
- Deep storage: 存儲segments,Druid目前已經支持本地磁碟,NFS掛載磁碟,HDFS,S3等。Deep Storage的數據有2個來源,一個是batch Ingestion, 另一個是real-time nodes;
- ZooKeeper: Druid使用Zookeeper作為分散式集群內部的通信組件,各類節點通過Curator Framework將實例與服務註冊到Zookeeper上,同時將集群內需要共用的信息也存儲在Zookeeper目錄下,從而簡化集群內部自動連接管理、leader選舉、分散式鎖、path緩存以及分散式隊列等複雜邏輯。
- ① 實時數據寫入到實時節點,會創建索引結構的Segment.
- ② 實時節點的Segment經過一段時間會轉存到DeepStorage
- ③ 元數據寫入MySQL; 實時節點轉存的Segment會在ZooKeeper中新增一條記錄
- ④ 協調節點從MySQL獲取元數據,比如schema信息(維度列和指標列)
- ⑤ 協調節點監測ZK中有新分配/要刪除的Segment,寫入ZooKeeper信息:歷史節點需要載入/刪除Segment
- ⑥ 歷史節點監測ZK, 從ZooKeeper中得到要執行任務的Segment
- ⑦ 歷史節點從DeepStorage下載Segment並載入到記憶體/或者將已經保存的Segment刪除掉
- ⑧ 歷史節點的Segment可以用於Broker的查詢路由
- 由於各個節點和其他節點都是最小化解耦的, 所以下麵兩張圖分別表示實時節點和批量數據的流程:
- 數據從Kafka導入到實時節點, 客戶端直接查詢實時節點的數據.
- 批量數據使用IndexService,接收Post請求的任務,直接產生Segment寫到DeepStorage里.DeepStorage中的數據只會被歷史節點使用.
所以這裡要啟動的服務有: IndexService(overlord), Historical, Coordinator(協調節點通知歷史節點下載Segment)
說明:文章非原創,看了好多文章整理組合到一起的.
大數據相關的伙伴可以關註我的公眾號,分享大數據乾貨和麵試進階!