Storm基礎

来源:http://www.cnblogs.com/zpfbuaa/archive/2016/10/15/5964595.html
-Advertisement-
Play Games

Storm基本概念 Storm是一個開源的實時計算系統,它提供了一系列的基本元素用於進行計算:Topology、Stream、Spout、Bolt等等。 在Storm中,一個實時應用的計算任務被打包作為Topology發佈,這同Hadoop的MapReduce任務相似。但是有一點不同的是:在Hado ...


 

Storm基本概念

 

Storm是一個開源的實時計算系統,它提供了一系列的基本元素用於進行計算:Topology、Stream、Spout、Bolt等等。

 

在Storm中,一個實時應用的計算任務被打包作為Topology發佈,這同Hadoop的MapReduce任務相似。但是有一點不同的是:在Hadoop中,MapReduce任務最終會執行完成後結束;而在Storm中,Topology任務一旦提交後永遠不會結束,除非你顯示去停止任務。

 

計算任務Topology是由不同的Spouts和Bolts,通過數據流(Stream)連接起來的圖。下麵是一個Topology的結構示意圖:

 

 

其中包含有:

 

Spout:Storm中的消息源,用於為Topology生產消息(數據),一般是從外部數據源(如Message Queue、RDBMS、NoSQL、Realtime Log)不間斷地讀取數據併發送給Topology消息(tuple元組)。

 

Bolt:Storm中的消息處理者,用於為Topology進行消息的處理,Bolt可以執行過濾, 聚合, 查詢資料庫等操作,而且可以一級一級的進行處理。

 

最終,Topology會被提交到storm集群中運行;也可以通過命令停止Topology的運行,將Topology占用的計算資源歸還給Storm集群。

 

Storm數據流模型

 

數據流(Stream)是Storm中對數據進行的抽象,它是時間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭,負責為Topology從特定數據源發射Stream;Bolt可以接收任意多個Stream作為輸入,然後進行數據的加工處理過程,如果需要,Bolt還可以發射出新的Stream給下級Bolt進行處理。

 

下麵是一個Topology內部Spout和Bolt之間的數據流關係:

 

 

Topology中每一個計算組件(Spout和Bolt)都有一個並行執行度,在創建Topology時可以進行指定,Storm會在集群內分配對應並行度個數的線程來同時執行這一組件。

 

那麼,有一個問題:既然對於一個Spout或Bolt,都會有多個task線程來運行,那麼如何在兩個組件(Spout和Bolt)之間發送tuple元組呢?

 

Storm提供了若幹種數據流分發(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什麼樣的Stream作為其輸入(註:Spout並不需要接收Stream,只會發射Stream)。

 

目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping,具體策略可以參考這裡

 

一種Storm不能支持的場景

 

以上介紹了一些Storm中的基本概念,可以看出,Storm中Stream的概念是Topology內唯一的,只能在Topology內按照“發佈-訂閱”方式在不同的計算組件(Spout和Bolt)之間進行數據的流動,而Stream在Topology之間是無法流動的

 

 

這一點限制了Storm在一些場景下的應用,下麵通過一個簡單的實例來說明。

 

假設現在有一個Topology1的結構如下:通過Spout產生數據流後,依次需要經過Filter Bolt,Join Bolt,Business1 Bolt。其中,Filter Bolt用於對數據進行過濾,Join Bolt用於數據流的聚合,Business1 Bolt用於進行一個實際業務的計算邏輯。

 

 

目前這個Topology1已經被提交到Storm集群運行,而現在我們又有了新的需求,需要計算一個新的業務邏輯,而這個Topology的特點是和Topology1公用同樣的數據源,而且前期的預處理過程完全一樣(依次經歷Filter Bolt和Join Bolt),那麼這時候Storm怎麼來滿足這一需求?據個人瞭解,有以下幾種“曲折”的實現方式:

 

1)  第一種方式:首先kill掉已經在集群中運行的Topology1計算任務,然後實現Business2 Bolt的計算邏輯,並重新打包形成一個新的Topology計算任務jar包後,提交到Storm集群中重新運行,這時候Storm內的整體Topology結構如下:

 

 

這種方式的缺點在於:由於要重啟Topology,所以如果Spout或Bolt有狀態則會丟失掉;同時由於Topology結構發生了變化,因此重新運行Topology前需要對程式的穩定性、正確性進行驗證;另外Topology結構的變化也會帶來額外的運維開銷。

 

2)  第二種方式:完全開發部署一套新的Topology,其中前面的公共部分的Spout和Bolt可以直接復用,只需要重新開發新的計算邏輯Business2 Bolt來替換原有的Business1 Bolt即可。然後重新提交新的Topology運行。這時候Storm內的整體Topology結構如下:

 

 

這種方式的缺點在於:由於兩個Topology都會從External Data Source讀取同一份數據,無疑增加了External Data Source的負載壓力;而且會導致同樣的數據在Storm集群內被傳輸相同的兩份,被同樣的計算單元Bolt進行處理,浪費了Storm的計算資源和網路傳輸帶寬。假設現在不止有兩個這樣的Topology計算任務,而是有N個,那麼對Storm的計算Slot的浪費很嚴重。

 

註意:上述兩種方式還有一個公共的缺點——系統可擴展性不好,這意味著不管哪種方式,只要以後有這種新增業務邏輯的需求,都需要進行複雜的人工操作或線性的資源浪費現象。

 

3) 第三種方式:OK,看了以上兩種方式後,也許你會提出下麵的解決方案:通過Kafka這樣的消息中間件,實現不同Topology的Spout共用數據源,而且這樣可以做到消息可靠傳輸、消息rewind回傳等,好處是對於Storm來說,已經有了storm-kafka插件的支持。這時候Storm內的整體Topology結構如下:

 

 

這種實現方式可以通過引入一層消息中間件減少對External Data Source的重覆訪問的壓力,而且可以通過消息中間件層,屏蔽掉External Data Source的細節,如果需要擴展新的業務邏輯,只需要重新部署運行新的Topology,應該說是現有Storm版本下很好的實現方式了。不過消息中間件的引入,無疑將給系統帶來了一定的複雜性,這對於Storm上的應用開發來說提高了門檻。

 

值得註意的是,方案三中仍遺留有一點問題沒有解決:對於Storm集群來說,這種方式還是沒有能夠從根本上避免數據在Storm不同Topology內的重覆發送與處理。這是由於Storm的數據流模型上的限制所導致的,如果Storm實現了不同Topology之間Stream的共用,那麼這一問題也就迎刃而解了。

 

一個流處理系統的數據流模型

 

個人工作中有幸參與過一個流處理框架的開發與應用。下麵我們來簡單看看其中所採用的數據流模型:

 

 

其中:

 

1)數據流(data stream:時間分佈和數量上無限的一系列數據記錄的集合體;

 

2)數據記錄(data record:數據流的最小組成單元,每條數據記錄包括 3 類數據:所屬數據流名稱(stream name)、用於路由的數據(keys)和具體數據處理邏輯所需的數據(value);

 

3)數據處理任務定義(task definition:定義一個數據處理任務的基本屬性,無法直接被執行,必須特化為具體的任務實例。其基本屬性包括:

 

  • (可選)輸入流(input stream):描述該任務依賴哪些數據流作為輸入,是一個數據流名稱列表;數據流產生源不會依賴其他數據流,可忽略該配置;
  • 數據處理邏輯(process logic):描述該任務具體的處理邏輯,例如由獨立進程進行的外部處理邏輯;
  • (可選)輸出流(output stream):描述該任務產生哪個數據流,是一個數據流名稱;數據流處理鏈末級任務不會產生新的數據流,可忽略該配置;

 

4)數據處理任務實例(task instance:對一個數據處理任務定義進行具體約束後,可推送到某個處理結點上運行的邏輯實體。附加下列屬性:

 

  • 數據處理任務定義:指向該任務實例對應的數據處理任務定義實體;
  • 輸入流過濾條件(input filting condition):一個 boolean 表達式列表,描述每個輸入流中符合什麼條件的數據記錄可以作為有效數據交給處理邏輯;若某個輸入流中所有數據記錄都是有效數據,則可直接用 true 表示;
  • (可選)強制輸出周期(output interval):描述以什麼頻率強制該任務實例產生輸出流記錄,可以用輸入流記錄個數或間隔時間作為周期;忽略該配置時,輸出流記錄產生周期完全由處理邏輯自身決定,不受框架約束;

 

5)數據處理結點(node:可容納多個數據處理任務實例運行的實體機器,每個數據處理結點的IPv4地址必須保證唯一。

 

該分散式流處理系統由多個數據處理結點(node)組成;每個數據處理結點(node)上運行有多個數據任務實例(task instance);每個數據任務實例(task instance)屬於一個數據任務定義(task definition),任務實例是在任務定義的基礎上,添加了輸入流過濾條件和強制輸出周期屬性後,可實際推送到數據處理結點(node)上運行的邏輯實體;數據任務定義(task definition)包含輸入數據流、數據處理邏輯以及輸出數據流屬性。

 

該系統中,通過分散式應用程式協調服務ZooKeeper集群存儲以上數據流模型中的所有配置信息;不同的數據處理節點統一通過ZooKeeper集群獲取數據流的配置信息後進行任務實例的運行與停止、數據流的流入和流出。

 

同時,每個數據處理任務可以接受流系統中已存在的任意數據流(data stream)作為輸入,並產出新的任意名稱的數據流(data stream),被其他結點上運行的任務實例訂閱。不同結點之間對於各個數據流(data stream)的訂閱關係,通過ZooKeeper集群來動態感知並負責通知流系統做出變化。

 

二者在數據流模型上的不同之處

 

至於兩個系統的實現細節,我們先不去做具體比較,下麵僅列出二者在數據流模型上的一些不同之處(這裡並不是為了全面對比二者的不同之處,只是列出其中的關鍵部分):

 

1)  在Storm中,數據流Stream是在Topology內進行定義,併在Topology內進行傳輸的;而在上面提到的流處理系統中,數據流Stream是在整個系統內全局唯一的,可以在整個集群內被訂閱。

 

2)  在Storm中,數據流Stream的發佈和訂閱都是靜態的,所謂靜態是指數據流的發佈與訂閱關係在向Storm集群提交Topology計算任務時,被一次性生成的,這一關係在Topology的運行過程中是不能被改變的;而在上面提到的流處理系統中,數據流Stream的發佈和訂閱都是動態的,即數據處理任務task可以動態的發佈Stream,也可以動態的訂閱系統內已經生成的任意Stream,數據流的訂閱關於通過分散式應用程式協調服務ZooKeeper集群的動態節點來維護管理。

 

有了以上的對比,我們不難發現,對於本文所舉的應用場景實例,Storm的數據流模式尚不能很方便的支持,而在這裡提到的這個流處理系統的全局數據流模型下,這一應用場景的需求可以很方便的滿足。

 

1.什麼是Topology?
2.如何創建Topology?
3.Topology的worker數由誰來配置?
4.Topology中某個bolt的executor數由誰來指定?
5.Supervisor、worker、Executor、Task、Spout、Bolt之間的關係?








在創建Storm的Topology時,我們通常使用如下代碼:
builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); 
Config conf = new Config(); 
conf.setNumWorkers(3); 
參數1:bolt名稱 "cpp"
參數2:bolt類型 CppBolt
參數3:bolt的並行數,parallelismNum,即運行topology時,該bolt的線程數
setNumTasks() 設置bolt的task數
noneGrouping()  設置輸入流方式及欄位
conf.setNumWorkers()設置worker數據。

經過多次試驗總結,得出如下結論:
1)Topology的worker數通過config設置,即執行該topology的worker(java)進程數。它可以通過storm rebalance 命令任意調整。
2) Topology中某個bolt的executor數,即parallelismNum,即執行該bolt的線程數,在setBolt時由第三個參數指定。它可以通過storm rebalance 命令調整,但最大不能超過該bolt的task數;
3) bolt的task數,通過setNumTasks()設置。(也可不設置,預設取bolt的executor數),無法在運行時調整。
4)Bolt實例數,這個比較特別,它和task數相等。有多少個task就會new 多少個Bolt對象。而這些Bolt對象在運行時由Bolt的thread進行調度。也即是說

builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); 
會創建3個線程,但有記憶體中會5個CppBolt對象,三個線程調度5個對象。

Supervisor、worker、Executor、Task、Spout、Bolt之間的關係


每台Supervisor上運行著若幹個worker進程,在Configure對象中可以配置worker的數量,conf.setNumWorkers(number);

每個Workder進行上運行著若幹個Executor執行線程,就是所謂的Task任務。

在TopologyBuilder對象中可以配置Task的數量,topologyBuilder.setNumTasks(number);這些Task任務指的是Spout或者Bolt任務。

在TopologyBuilder對象中可以配置Spout、Bolt的任務的數量。

topologyBuilder.setSpout(“spout tag name”,new XxSpout(),number);
topologyBuilder.setBolt(“bolt tag name”,new XxBolt(),number);
預設情況下# executor = #tasks即一個Executor中運行著一個Task。Spout或者Bolt的Task個數一旦指定之後就不能改變了,而Executor的數量可以根據情況來進行動態的調整。
一句話介紹,每台worker node上可以運行很多個worker,每個worker會開闢很多Executor線程來執行Task。在Storm看來,spout和bolt都是task。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 介紹 為了避免在資料庫表結構設計過程中使用系統保留關鍵字我們必須知道資料庫存在哪些關鍵字,接下來會列出mysql、oracle、sqlserver三個資料庫各自的保留關鍵字。 Mysql http://dev.mysql.com/doc/refman/5.7/en/keywords.html ACC ...
  • 在SQL Server中,特殊的數據類型主要有三個,分別是:bit、sql_variant 和 sysname 一,bit bit類型,只有三個有效值:0,1 和 null,字元串true或false能夠隱式轉換為bit類型,true轉換為1,false轉換為0;任何非0的整數值轉換成bit類型時, ...
  • 以下的文章主要是向大家描述的是MySQL資料庫中delimiter的作用是什麼?我們一般都認為這個命令和存儲過程關係不大,到底是不是這樣的呢?以下的文章將會給你相關的知識,望你會有所收穫。 其實就是告訴MySQL解釋器,該段命令是否已經結束了,MySQL資料庫是否可以執行了。預設情況下,delimi ...
  • 上篇文章中介紹了 單變數線性回歸 ,為什麼說時單變數呢,因為它只有單個特征,其實在很多場景中只有單各特征時遠遠不夠的,當存在多個特征時,我們再使用之前的方法來求特征繫數時是非常麻煩的,需要一個特征繫數一個偏導式,而卻最要命的時特性的增長時及其迅猛的,幾十、幾百、幾千…… 單變數線性回歸: 多變數線性 ...
  • http://stackoverflow.com/questions/745538/create-function-through-mysqldb How can I define a multi-statement function or procedure in using the MySQLd ...
  • SQL2008還原備份的SQL2008的.bak文件時會報錯,大部分原因是因為當前登錄的版本是2005的,因而無法還原用SQL2008備份的數據,結局的辦法是去掉登錄的伺服器名稱後面的尾碼SQLEXPRESS,則登錄SQL2008,但直接登錄也可能會出現如下問題:provide命名管道提供程式,er ...
  • SELECT SUBSTRING(CONVERT(varchar(100),時間欄位, 22),0,15) AS aa FROM 表名 ...
  • C# .Net :Excel NPOI導入導出操作教程之將Excel文件讀取並寫到資料庫表,示例分享 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...