Storm學習筆記(1)Hello WordCount - 單機模式

来源:https://www.cnblogs.com/maksheiev/archive/2018/02/21/8457775.html
-Advertisement-
Play Games

古人雲,紙上得來終覺淺,絕知此事要躬行。翻譯過來,就是學東西哪有不踩坑的。 因為工作原因要折騰Storm,環境和第一個例子折騰了好久,搞完了回頭看,吐血的簡單。 Storm有兩種模式,單機和集群。入門當然選單機。 1、安裝JDK,配置Eclipse環境 2、建立一個Maven工程,在pom.xml加 ...


古人雲,紙上得來終覺淺,絕知此事要躬行。翻譯過來,就是學東西哪有不踩坑的。

因為工作原因要折騰Storm,環境和第一個例子折騰了好久,搞完了回頭看,吐血的簡單。

 

Storm有兩種模式,單機和集群。入門當然選單機。

1、安裝JDK,配置Eclipse環境

2、建立一個Maven工程,在pom.xml加上這段

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>1.1.2</version>
     <scope>compile</scope>
</dependency>

3、通過Maven建立項目和下載依賴包。

其實,所需要的storm-core-1.1.2.jar可以從官網下載的storm包裡面的lib目錄中找到。

Java在下不熟悉,也就不多說了。

4、參考官方或者各種教程的word-count例子編個代碼。

5、在Eclipse裡面run起來就可以了。

什麼Storm, Zookeeper,其實在這個單機入門例子裡面,都是不需要的!

就這麼簡單。

 

具體代碼來說,官方提供的storm-starter例子中,WordCountTopology.java挺適合入門的。

只是裡面有個坑:

官方採用了python作為句子分割blot的實現,但是如果環境不具備的話,一跑就會出錯。

就是這段:

public static class SplitSentence extends ShellBolt implements IRichBolt {

  public SplitSentence() {
     super("python", "splitsentence.py");
   }

// 其餘部分略

 

可以用這個類來替代:

 1 public static class SplitSentence extends BaseBasicBolt{  
 2      @Override  
 3      public void execute(Tuple tuple, BasicOutputCollector collector){  
 4          // 接收到一個句子  
 5          String sentence = tuple.getString(0);  
 6          // 把句子切割為單詞  
 7          StringTokenizer iter = new StringTokenizer(sentence);  
 8          // 發送每一個單詞  
 9          while(iter.hasMoreElements()){  
10              collector.emit(new Values(iter.nextToken()));  
11          }  
12      }  
13        
14      @Override  
15      public void declareOutputFields(OutputFieldsDeclarer declarer){  
16          // 定義一個欄位  
17          declarer.declare(new Fields("word"));  
18      }  
19      
20      @Override
21      public Map<String, Object> getComponentConfiguration() {
22        return null;
23      }
24 } 
View Code

 

Run起來以後,在Eclipse的Console視窗裡面可以看到運行的詳情。

 

完整代碼如下:

  1 package storm.blueprints;
  2 
  3 import org.apache.storm.spout.SpoutOutputCollector;
  4 import org.apache.storm.task.TopologyContext;
  5 import org.apache.storm.topology.OutputFieldsDeclarer;
  6 import org.apache.storm.topology.base.BaseRichSpout;
  7 import org.apache.storm.tuple.Fields;
  8 import org.apache.storm.tuple.Values;
  9 
 10 import org.apache.storm.utils.Utils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 import org.apache.storm.Config;  
 15 import org.apache.storm.LocalCluster;  
 16 import org.apache.storm.StormSubmitter;  
 17 import org.apache.storm.task.ShellBolt;  
 18    
 19 import org.apache.storm.topology.BasicOutputCollector;  
 20 import org.apache.storm.topology.IRichBolt;  
 21 import org.apache.storm.topology.TopologyBuilder;  
 22 import org.apache.storm.topology.base.BaseBasicBolt;  
 23    
 24 import org.apache.storm.tuple.Tuple;  
 25 import java.util.HashMap;
 26 import java.util.Map;
 27 
 28 
 29 import java.util.*;
 30 
 31 public class HelloWordCount 
 32 {
 33      public static class RandomSentenceSpout extends BaseRichSpout {
 34            private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
 35 
 36           SpoutOutputCollector _collector;
 37            Random _rand;
 38 
 39 
 40            @Override
 41            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 42              _collector = collector;
 43              _rand = new Random();
 44            }
 45 
 46           @Override
 47            public void nextTuple() {
 48              Utils.waitForMillis(100);//(100);
 49              String[] sentences = new String[]{
 50                      sentence("the cow jumped over the moon"),
 51                      sentence("an apple a day keeps the doctor away"),
 52                      sentence("four score and seven years ago"),
 53                      sentence("snow white and the seven dwarfs"),
 54                      sentence("i am at two with nature")};
 55              final String sentence = sentences[_rand.nextInt(sentences.length)];
 56 
 57             LOG.debug("Emitting tuple: {}", sentence);
 58 
 59             _collector.emit(new Values(sentence));
 60              
 61              System.out.println("***" + sentence);
 62            }
 63 
 64           protected String sentence(String input) {
 65              return input;
 66            }
 67 
 68           @Override
 69            public void ack(Object id) {
 70            }
 71 
 72           @Override
 73            public void fail(Object id) {
 74            }
 75 
 76           @Override
 77            public void declareOutputFields(OutputFieldsDeclarer declarer) {
 78              declarer.declare(new Fields("sentence"));
 79            }
 80      }
 81      
 82        
 83      // 定義個Bolt,用於將句子切分為單詞  
 84      public static class SplitSentence extends BaseBasicBolt{  
 85          @Override  
 86          public void execute(Tuple tuple, BasicOutputCollector collector){  
 87              // 接收到一個句子  
 88              String sentence = tuple.getString(0);  
 89              // 把句子切割為單詞  
 90              StringTokenizer iter = new StringTokenizer(sentence);  
 91              // 發送每一個單詞  
 92              while(iter.hasMoreElements()){  
 93                  collector.emit(new Values(iter.nextToken()));  
 94              }  
 95          }  
 96            
 97          @Override  
 98          public void declareOutputFields(OutputFieldsDeclarer declarer){  
 99              // 定義一個欄位  
100              declarer.declare(new Fields("word"));  
101          }  
102          
103          @Override
104          public Map<String, Object> getComponentConfiguration() {
105            return null;
106          }
107      }  
108        
109      // 定義一個Bolt,用於單詞計數  
110      public static class WordCount extends BaseBasicBolt {  
111          Map<String, Integer> counts = new HashMap<String, Integer>();  
112            
113          @Override  
114          public void execute(Tuple tuple, BasicOutputCollector collector){  
115              String word = tuple.getString(0);
116              Integer count = counts.get(word);
117              if (count == null)
118                count = 0;
119              count++;
120              counts.put(word, count);
121              
122              System.out.println(word +"  "+count);
123          }  
124            
125          @Override  
126          public void declareOutputFields(OutputFieldsDeclarer declarer){  
127              // 定義兩個欄位word和count  
128              declarer.declare(new Fields("word","count"));  
129          }  
130      }  
131      public static void main(String[] args) throws Exception   
132      {  
133          System.out.println("main");
134          // 創建一個拓撲  
135          TopologyBuilder builder = new TopologyBuilder();  
136          // 設置Spout,這個Spout的名字叫做"Spout",設置並行度為5  
137          builder.setSpout("Spout", new RandomSentenceSpout(), 5);  
138          // 設置slot——“split”,並行度為8,它的數據來源是spout的  
139          builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout");  
140          // 設置slot——“count”,你並行度為12,它的數據來源是split的word欄位  
141          builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word"));  
142            
143          Config conf = new Config();  
144                
145              // 本地集群  
146              LocalCluster cluster = new LocalCluster();  
147                
148              System.out.println("LocalCluster");
149              
150              // 提交拓撲(該拓撲的名字叫word-count)  
151              cluster.submitTopology("word-count", conf, builder.createTopology() );  
152                
153              System.out.println("submitTopology");           
154              
155              Utils.waitForSeconds(10);
156              cluster.killTopology("word-count");
157              cluster.shutdown();
158              }  
159      }  
160      
161      public static class Utils {
162 
163         public static void waitForSeconds(int seconds) {
164              try {
165                  Thread.sleep(seconds * 1000);
166              } catch (InterruptedException e) {
167              }
168          }
169 
170         public static void waitForMillis(long milliseconds) {
171              try {
172                  Thread.sleep(milliseconds);
173              } catch (InterruptedException e) {
174              }
175          }
176      }
177 }
View Code

 

請使用手機"掃一掃"x


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 作為軟體開發人員,我們已知道思考如何將應用程式因數分解成組件部分。 這是對象導向、軟體抽象和組件化的中心模式。 現在,這種因數分解往往以共用庫和技術層之間的類與介面呈現。 通常採用一種分層方法,有後端存儲、中間層業務邏輯和前端用戶界面 (UI)。 過去幾年來的變化是身為開發人員的我們,開始為業務驅動 ...
  • 一致性演算法 是分散式系統中最重要的問題之一。錶面上看,這似乎很簡單,只是讓幾個節點在某些方面達成一致。在本篇之中,會帶大家完整的梳理分散式系統之中的共識演算法,來更加深刻的理解分散式系統的設計。 1.原子提交和兩階段提交(2PC) 原子提交防止了資料庫處於半更新的狀態,這對於需要滿足多對象事務和維護次 ...
  • 該模塊作用是完成Python數值和C語言結構體的Python字元串形式間的轉換。這可以用於處理存儲在文件中或從網路連接中存儲的二進位數據,以及其他數據源。 用途: 在Python基本數據類型和二進位數據之間進行轉換 模塊提供了用於在位元組字元串和Python原生數據類型之間轉換函數,比如數字和字元串。 ...
  • Time Limit: 4000/2000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others)Total Submission(s): 5934 Accepted Submission(s): 1845 Problem Descrip ...
  • 現代信息系統應該是避不開大數據處理的。作為一個通用的系統集成工具也必須具備大數據存儲和讀取能力。cassandra是一種分散式的資料庫,具備了分散式資料庫高可用性(high-availability)特性,對於一個實時大型分散式集成系統來說是核心支柱。與傳統的關係資料庫對比,cassandra從數據 ...
  • 1 學習計劃 1、實現區域導入功能 n OCUpload一鍵上傳插件使用 n 將文件上傳到Action n POI簡介 n 使用POI解析Excel文件 n 完成資料庫操作 n 使用pinyin4J生成簡碼和城市編碼 2、區域分頁查詢 n 頁面調整 n 服務端實現 3、重構分頁代碼 n BaseAc ...
  • OGNL取值範圍分兩部分,root、Context兩部分 可以放置任何對象作為ROOT,CONTEXT中必須是Map鍵值對 示例: 準備工作: User類: package bean; public class User { private String name; private Integer ...
  • Swagger 是一個規範和完整的框架,用於生成、描述、調用和可視化 RESTful 風格的 Web 服務。總體目標是使客戶端和文件系統作為伺服器以同樣的速度來更新。文件的方法,參數和模型緊密集成到伺服器端的代碼,允許API與介面方法,參數等保存同步,大大減少了介面開發人員的工作量.這個例子是我本地 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...