strom-kafka的使用

来源:http://www.cnblogs.com/difeng/archive/2016/01/03/5097220.html
-Advertisement-
Play Games

storm kafka的使用 1.storm kafka介紹 storm kafka是storm自帶的從kafka上獲取消息的kafka客戶端程式。 提供kafka和Trident的spout實現從kafka消費數據。 2.storm kafka的使用實例 maven的依賴配置文件,要註意st...


storm-kafka的使用

1.storm-kafka介紹

storm-kafka是storm自帶的從kafka上獲取消息的kafka客戶端程式。
提供kafka和Trident的spout實現從kafka消費數據。

2.storm-kafka的使用實例

maven的依賴配置文件,要註意strom-kafka是使用的kafka的低級api,因此也要引用kafka的包。如果不引,雖然編譯不報錯,但運行時會報錯,我在初次使用時就是因為這個原因一直有問題。

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.5</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.5</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

下麵是我寫的一個demo
具體步驟如下
1.new BrokerHosts
需要的參數zookeeper的地址
2.new SpoutConfig
構建SpoutConfig,需要設置BrokerHosts,kafka的topic,strom在zookeeper上的根等相關的參數。
3.new TopologyBuilder
給TopologyBuilder設置Soupt和Boult用於構建一個Topology
4.配置Config並設置參數,啟動LocalCluster,提交topology任務。

import java.util.Arrays;
import java.util.Map;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * @Description:strom-kafka 使用
 * @author:difeng
 * @time:2015年11月18日 上午10:18:31
 */
public class StormKafkaConsumer {
    
    public static class PingCounter extends BaseRichBolt{
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private OutputCollector collector;
        @Override
        public void execute(Tuple input) {
            String msg = input.getString(0);
            System.out.println("---------------------" + msg + "-----------------");
            collector.ack(input);
        }

        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
            this.collector = arg2;
            System.out.println("++++++++++++++++++++prepare++++++++++++++++++++++++++++++++++");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            // TODO Auto-generated method stub
             System.out.println("++++++++++++++++++++declareOutputFields+++++++++++++++++++++");
        }

    }

    public static void main(String[] args) {
        //zookeeper的伺服器地址
        String zks = "192.168.1.50:2181,192.168.1.57:2181,192.168.1.58:2181";
        //消息的topic
        String topic = "data_icmp_ping";
        //strom在zookeeper上的根
        String zkRoot = "/storm"; 
        String id = "data_icmp_ping";
        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.forceFromStart = true;
        spoutConf.zkServers = Arrays.asList(new String[] {"192.168.1.50,192.168.1.57,192.168.1.58"});
        spoutConf.zkPort = 2181;
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); 
        builder.setBolt("ping-counter", new PingCounter(),3).shuffleGrouping("kafka-reader");
        Config conf = new Config();
        conf.setDebug(true);
        //設置任務線程數
        conf.setMaxTaskParallelism(1);
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        try {
            Thread.sleep(60000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cluster.shutdown();
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.先上圖 下麵是tcpdump的源碼。顏色根據自己的喜好配置,我比較喜歡亮的顏色,看的清楚!2.下載輔助配置文件 首先,我們下載一個vim的插件xterm-color-table.vim,下載地址:http://www.vim.org/scripts/sc...
  • 1.前言 編譯linux內核失敗的原因很多時候就是驅動選錯,適合自己本機的驅動沒編譯進去。面對特殊平臺(或者有些潔癖者,我就是^_^),要編譯精簡內核,只要本機驅動,其他都不需要。面對內核裡面這麼多驅動,不懂硬體的我們,看到都頭大。本人第一次安裝gent...
  • 1. 配置SSH遠程登錄 root@kali:~ vi /etc/ssh/sshd_config \ PermitRootLogin without password PermitRootLogin yes root@kali:~ service ssh restart 2. 設置開...
  • 首先,確保lamp環境已安裝好。準備好項目源代碼,資料庫備份文件等。由於沒有安裝好VNC,因此只能用ssh部署了。 將項目源代碼壓縮,Linux預設是支持SFTP的,用SFTP將源代碼壓縮包上傳到 /var/www/html 目錄下。關於FTP軟體,我覺得可以使用Notepad++的NPPFt...
  • OpenGL視窗 能用於OpenGL的視窗庫有很多,常見的有glut、freeglut、SDL、GLFW等。glut基本已經被廢棄,其他幾個再不同場合都有不同的應用。 GLFW為opengl superbible 6th & 7th中推薦的第三方開源視窗庫。GLFW的特點大概總結如下: 跨平臺,能在...
  • 1.編譯含有splice()函數的程式時出現,'SPLICE_F_MOVE' undeclared,'SPLICE_F_NONBLOCK' ‘SPLICE_F_MORE' 也是一樣undeclared!2.使用man splice查看,發現要定義巨集_GNU_SOURCE1 #define _GNU_...
  • 環境:Centos 6.5介紹:PhantomJS 是一個基於 WebKit 的伺服器端 JavaScript API。它全面支持web而不需瀏覽器支持,其快速,原生支持各種Web標準: DOM 處理, CSS 選擇器, JSON, Canvas, 和 SVG。 PhantomJS 可以用於 頁面自...
  • 網路編程中的關鍵問題總結總結下網路編程中關鍵的細節問題,包含連接建立、連接斷開、消息到達、發送消息等等;連接建立包括服務端接受 (accept) 新連接和客戶端成功發起 (connect) 連接。 accept接受連接的問題在本文最後會聊到,這裡談談connect的關鍵點; 使用非阻塞連接建...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...