最近利用閑暇時間,又重新研讀了一下Storm。認真對比了一下Hadoop,前者更擅長的是,實時流式數據處理,後者更擅長的是基於HDFS,通過MapReduce方式的離線數據分析計算。對於Hadoop,本身不擅長實時的數據分析處理。兩者的共同點都是分散式的架構,而且,都類似有主/從關係的概念。本文中我 ...
最近利用閑暇時間,又重新研讀了一下Storm。認真對比了一下Hadoop,前者更擅長的是,實時流式數據處理,後者更擅長的是基於HDFS,通過MapReduce方式的離線數據分析計算。對於Hadoop,本身不擅長實時的數據分析處理。兩者的共同點都是分散式的架構,而且,都類似有主/從關係的概念。本文中我就不具體闡述Storm集群和Zookeeper集群如何部署的問題,我想通過一個實際的案例切入,分析一下如何利用Storm,完成實時分析處理數據的。
Storm本身是Apache托管的開源的分散式實時計算系統,它的前身是Twitter Storm。在Storm問世以前,處理海量的實時數據信息,大部分是類似於使用消息隊列,加上工作進程/線程的方式。這使得構建這類的應用程式,變得異常的複雜。很多的業務邏輯中,你不得不考慮消息的發送和接收,線程之間的併發控制等等問題。而其中的業務邏輯可能只是占據整個應用的一小部分,而且很難做到業務邏輯的解耦。但是Storm的出現改變了這種局面,它首先抽象出數據流Stream的抽象概念,一個Stream指的是tuples組成的無邊界的序列。後面又繼續提出Spouts、Bolts的概念。Spouts在Storm裡面是數據源,專門負責生成流。而Bolts則是以流作為輸入,並重新生成流作為輸出,並且Bolts還會繼續指定它輸入的流應該如何劃分。最後Storm是通過拓撲(Topology)這種抽象概念,組織起若幹個Spouts、Bolts構成的分散式數據處理網路。Storm設計的時候,就有意的把Spouts、Bolts組成的拓撲(Topology)網路通過Thrift服務方式進行封裝,這個做法,使得Storm的Spouts、Bolts組件可以通過目前主流的任意語言實現,使得整個框架的相容性和擴展性更加的優秀。
在Storm裡面拓撲(Topology)的概念,非常類似Hadoop裡面MapReduce的Job的概念。不同的是Storm的拓撲(Topology)只要你啟動了,它就會一直運行下去,除非你kill掉;而MapReduce的Job最終它是會結束的。基於這樣的模式,使得Storm非常適合處理實時性的數據分析,持續計算,DRPC(分散式RPC)等。
好了,我就結合實際的案例,設計分析一下,如何利用Storm改善應用的處理性能。
移動公司的垃圾簡訊監控平臺,實時地上傳每個省的疑似垃圾簡訊用戶的垃圾簡訊內容文件,每個省則根據文件中垃圾簡訊的內容,解析過濾出,包含指定敏感關鍵字的垃圾簡訊進行入庫。被入庫的垃圾簡訊用戶被列為敏感用戶,是重點監控對象,畢竟亂髮這些垃圾簡訊是非常不對的。垃圾簡訊監控平臺生成的文件速度非常驚人,原來的傳統做法是,根據每個省的每一個地市,對應一個獨立應用,串列化地解析、過濾敏感關鍵字,來進行入庫處理。但是,從現狀來看,程式處理的性能並不高效,常常造成文件積壓,沒有及時處理入庫。
現在,我們就通過Storm,來重新梳理、組織一下上述的應用場景。
首先,我先說明一下,該案例中,Storm集群和Zookeeper集群的部署情況,如下圖所示:
Nimbus對應的主機是192.168.95.134是Storm主節點,其餘兩台從節點Supervisor對應的主機分別是192.168.95.135(主機名:slave1)、192.168.95.136(主機名:slave2)。同樣的,Zookeeper集群也是部署在上述節點上。Storm集群和Zookeeper集群會互相通信,因為Storm就是基於Zookeeper的。然後先啟動每個節點的Zookeeper服務,其次分別啟動Storm的Nimbus、Supervisor服務。具體可以到Storm安裝的bin目錄下麵啟動服務,啟動命令分別為storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。然後用jps觀察啟動的效果。沒有問題的話,在Nimbus服務對應的主機上啟動Storm UI監控對應的服務,在Storm安裝目錄的bin目錄輸入命令:storm ui >/dev/null 2>&1 &。然後打開瀏覽器輸入:http://{Nimbus服務對應的主機ip}:8080,這裡就是輸入:http://192.168.95.134:8080/。觀察Storm集群的部署情況,如下圖所示:
可以發現,我們的Storm的版本是0.9.5,它的從節點(Supervisor)有2個,分別是slave1、slave2。一共的woker的數量是8個(Total slots)。Storm集群我們已經部署完畢,也啟動成功了。現在我們就利用Storm的方式,來重新改寫一下這種敏感信息實時監控過濾的應用。首先看下,Storm方式的拓撲結構圖:
其中的SensitiveFileReader-591、SensitiveFileReader-592(用戶簡訊採集器,分地市)代表的是Storm中的Spouts組件,表示一個數據的源頭,這裡是表示從伺服器的指定目錄下,讀取疑似垃圾簡訊用戶的垃圾簡訊內容文件。當然Spouts的組件你可以根據實際的需求,擴展出許多Spouts。
然後讀取出文件中每一行的內容之後,就是分析文件的內容組件了,這裡是指:SensitiveFileAnalyzer(監控簡訊內容拆解分析),它負責分析出文件的格式內容。
為了簡單演示起見,我這裡定義文件的格式為如下內容(隨便寫一個例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每個列之間用&進行連接。其中home_city=591表示疑似垃圾簡訊的用戶歸屬地市編碼,591表示福州、592表示廈門;user_id=5911000表示疑似垃圾簡訊的用戶標識;msisdn=10000表示疑似垃圾簡訊的用戶手機號碼;sms_content=abc-slave1代表的就是垃圾簡訊的內容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt組件,用來處理Spouts“流”出的數據。
最後,就是我們根據解析好的數據,匹配業務規定的敏感關鍵字,進行過濾入庫了。這裡我們是把過濾好的數據存入MySQL資料庫中。負責這項任務的組件是:SensitiveBatchBolt(敏感信息採集處理),當然它也是Storm中的Bolt組件。好了,以上就是完整的Storm拓撲(Topology)結構了。
現在,我們對於整個敏感信息採集過濾監控的拓撲結構,有了一個整體的瞭解之後,我們再來看下如何具體編碼實現!先來看下整個工程的代碼層次結構,它如下圖所示:
首先來看下,我們定義的敏感用戶的數據結構RubbishUsers,假設,我們要過濾的敏感用戶的簡訊內容中,要包含“racketeer”、“Bad”等敏感關鍵字。具體代碼如下:
/** * @filename:RubbishUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用戶實體定義 * @author tangjie * @version 1.0 * */ package newlandframework.storm.model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import java.io.Serializable; public class RubbishUsers implements Serializable { // 用戶歸屬地市編碼 private Integer homeCity; // 用戶編碼 private Integer userId; // 用戶號碼 private Integer msisdn; // 簡訊內容 String smsContent; public final static String HOMECITY_COLUMNNAME = "home_city"; public final static String USERID_COLUMNNAME = "user_id"; public final static String MSISDN_COLUMNNAME = "msisdn"; public final static String SMSCONTENT_COLUMNNAME = "sms_content"; public final static Integer[] SENSITIVE_HOMECITYS = new Integer[] { 591/* 福州 */, 592 /* 廈門 */}; // 敏感關鍵字,後續可以考慮單獨開闢放入緩存或資料庫中,這裡僅僅為了Demo演示 public final static String SENSITIVE_KEYWORD1 = "Bad"; public final static String SENSITIVE_KEYWORD2 = "racketeer"; public final static String[] SENSITIVE_KEYWORDS = new String[] { SENSITIVE_KEYWORD1, SENSITIVE_KEYWORD2 }; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity = homeCity; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn = msisdn; } public String getSmsContent() { return smsContent; } public void setSmsContent(String smsContent) { this.smsContent = smsContent; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("homeCity", homeCity).append("userId", userId) .append("msisdn", msisdn).append("smsContent", smsContent) .toString(); } }
現在,我們看下敏感信息數據源組件SensitiveFileReader的具體實現,它負責從伺服器的指定目錄下麵,讀取疑似垃圾簡訊用戶的垃圾簡訊內容文件,然後把每一行的數據,發送給下一個處理的Bolt(SensitiveFileAnalyzer),每個文件全部發送結束之後,在當前目錄中,把原文件重命名成尾碼bak的文件(當然,你可以重新建立一個備份目錄,專門用來存儲這種處理結束的文件),SensitiveFileReader的具體實現如下:
/** * @filename:SensitiveFileReader.java * * Newland Co. Ltd. All rights reserved. * * @Description:用戶簡訊採集器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.spout; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SensitiveFileReader extends BaseRichSpout { // 福州地市用戶敏感簡訊文件上傳路徑 public static final String InputFuZhouPath = "/home/tj/data/591"; // 廈門地市用戶敏感簡訊文件上傳路徑 public static final String InputXiaMenPath = "/home/tj/data/592"; // 處理成功改成bak尾碼 public static final String FinishFileSuffix = ".bak"; private String sensitiveFilePath = ""; private SpoutOutputCollector collector; public SensitiveFileReader(String sensitiveFilePath) { this.sensitiveFilePath = sensitiveFilePath; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Collection<File> files = FileUtils.listFiles( new File(sensitiveFilePath), FileFilterUtils.notFileFilter(FileFilterUtils .suffixFileFilter(FinishFileSuffix)), null); for (File f : files) { try { List<String> lines = FileUtils.readLines(f, "GBK"); for (String line : lines) { System.out.println("[SensitiveTrace]:" + line); collector.emit(new Values(line)); } FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + FinishFileSuffix)); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sensitive")); } }
監控簡訊內容拆解分析器SensitiveFileAnalyzer,這個Bolt組件,接收到數據源SensitiveFileReader的數據之後,就按照上面定義的格式,對文件中每一行的內容進行解析,然後把解析完畢的內容,繼續發送給下一個Bolt組件:SensitiveBatchBolt(敏感信息採集處理)。現在,我們來看下SensitiveFileAnalyzer這個Bolt組件的實現:
/** * @filename:SensitiveFileAnalyzer.java * * Newland Co. Ltd. All rights reserved. * * @Description:監控簡訊內容拆解分析 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.util.Map; import newlandframework.storm.model.RubbishUsers; import org.apache.storm.guava.base.Splitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SensitiveFileAnalyzer extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String line = input.getString(0); Map<String, String> join = Splitter.on("&").withKeyValueSeparator("=").split(line); collector.emit(new Values((String) join .get(RubbishUsers.HOMECITY_COLUMNNAME), (String) join .get(RubbishUsers.USERID_COLUMNNAME), (String) join .get(RubbishUsers.MSISDN_COLUMNNAME), (String) join .get(RubbishUsers.SMSCONTENT_COLUMNNAME))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(RubbishUsers.HOMECITY_COLUMNNAME, RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME, RubbishUsers.SMSCONTENT_COLUMNNAME)); } }
最後一個Bolt組件SensitiveBatchBolt(敏感信息採集處理)根據上游Bolt組件SensitiveFileAnalyzer發送過來的數據,然後跟業務規定的敏感關鍵字進行匹配,如果匹配成功,說明這個用戶,就是我們要重點監控的用戶,我們把他,通過hibernate採集到MySQL資料庫,統一管理。最後要說明的是,SensitiveBatchBolt組件還實現了一個監控的功能,就是定期列印出,我們已經採集到的敏感信息用戶數據。現在給出SensitiveBatchBolt的實現:
/** * @filename:SensitiveBatchBolt.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感信息採集處理 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.iterators.FilterIterator; import org.apache.commons.lang.StringUtils; import org.hibernate.Criteria; import org.hibernate.HibernateException; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.criterion.MatchMode; import org.hibernate.criterion.Restrictions; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import newlandframework.storm.model.RubbishUsers; public class SensitiveBatchBolt implements IBasicBolt { // Hibernate配置載入 private final static String HIBERNATE_APPLICATIONCONTEXT = "newlandframework/storm/resource/jdbc-hibernate-bean.xml"; // Spring、Hibernate上下文不要序列化 private static transient ApplicationContext hibernate = new ClassPathXmlApplicationContext( HIBERNATE_APPLICATIONCONTEXT); private static transient SessionFactory sessionFactory = (SessionFactory) hibernate .getBean("sessionFactory"); public SensitiveBatchBolt() throws SQLException { super(); } private static List list = new ArrayList(Arrays.asList(RubbishUsers.SENSITIVE_KEYWORDS)); // 敏感信息數據源,可以考慮放入緩存或者資料庫中載入判斷 private class SensitivePredicate implements Predicate { private String sensitiveWord = null; SensitivePredicate(String sensitiveWord) { this.sensitiveWord = sensitiveWord; } public boolean evaluate(Object object) { return this.sensitiveWord.contains((String) object); } } // Monitor線程定期列印監控採集處理情況 class SensitiveMonitorThread implements Runnable { private int sensitiveMonitorTimeInterval = 0; private Session session = null; SensitiveMonitorThread(int sensitiveMonitorTimeInterval) { this.sensitiveMonitorTimeInterval = sensitiveMonitorTimeInterval; session = sessionFactory.openSession(); } public void run() { while (true) { try { Criteria criteria1 = session.createCriteria(RubbishUsers.class); criteria1.add(Restrictions.and(Restrictions.or(Restrictions .like("smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD1, RubbishUsers.SENSITIVE_KEYWORD1 .length() + 2, "%"), MatchMode.ANYWHERE), Restrictions.like( "smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD2, RubbishUsers.SENSITIVE_KEYWORD2 .length() + 2, "%"), MatchMode.ANYWHERE)), Restrictions.in("homeCity", RubbishUsers.SENSITIVE_HOMECITYS))); List<RubbishUsers> rubbishList = (List<RubbishUsers>) criteria1.list(); System.out.println(StringUtils.center("[SensitiveTrace 敏感用戶清單如下]", 40, "-")); if (rubbishList != null) { System.out.println("[SensitiveTrace 敏感用戶數量]:" + rubbishList.size()); for (RubbishUsers rubbish : rubbishList) { System.out.println(rubbish + rubbish.getSmsContent()); } } else { System.out.println("[SensitiveTrace 敏感用戶數量]:0"); } } catch (HibernateException e) { e.printStackTrace(); } try { Thread.sleep(sensitiveMonitorTimeInterval * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 分散式環境下麵的要同步控制 private synchronized void save(Tuple input) { Session session = sessionFactory.openSession(); try { RubbishUsers users = new RubbishUsers(); users.setUserId(Integer.parseInt(input .getStringByField(RubbishUsers.USERID_COLUMNNAME))); users.setHomeCity(Integer.parseInt(input .getStringByField(RubbishUsers.HOMECITY_COLUMNNAME))); users.setMsisdn(Integer.parseInt(input .getStringByField(RubbishUsers.MSISDN_COLUMNNAME))); users.setSmsContent(input .getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); Predicate isSensitiveFileAnalysis = new SensitivePredicate( (String) input.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); FilterIterator iterator = new FilterIterator(list.iterator(),isSensitiveFileAnalysis); if (iterator.hasNext()) { session.beginTransaction(); // 入庫MySQL session.save(users); session.getTransaction().commit(); } } catch (HibernateException e) { e.printStackTrace(); session.getTransaction().rollback(); } finally { session.close(); } } // 很多情況下麵storm運行期執行報錯,都是由於execute有異常導致的,重點觀察execute的函數邏輯 // 最經常報錯的情況是報告:ERROR backtype.storm.daemon.executor - java.lang.RuntimeException:java.lang.NullPointerException // backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java ...) // 類似這樣的錯誤,有點莫名其妙,開始都運行的很正常,後面忽然就報空指針異常了,我開始以為是storm部署的問題, // 後面jstack跟蹤發現,主要還是execute邏輯的問題,所以遇到這類的問題不要手忙腳亂,適當結合jstack跟蹤定位 @Override public void execute(Tuple input, BasicOutputCollector collector) { save(input); } public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { final int sensitiveMonitorTimeInterval = Integer.parseInt(stormConf .get("RUBBISHMONITOR_INTERVAL").toString()); SensitiveMonitorThread montor = new SensitiveMonitorThread( sensitiveMonitorTimeInterval); new Thread(montor).start(); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
由於是通過hibernate入庫到MySQL,所以給出hibernate配置,首先是:hibernate.cfg.xml
<?xml version="1.0" encoding="utf-8"?> <!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-configuration-3.0.dtd"> <hibernate-configuration> <session-factory> <property name="hibernate.bytecode.use_reflection_optimizer">false</property> <property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect</property> <property name="show_sql">true</property> <mapping resource="newlandframework/storm/resource/rubbish-users.hbm.xml"/> </session-factory> </hibernate-configuration>
對應的ORM映射配置文件rubbish-users.hbm.xml內容如下:
<?xml version="1.0"?> <!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"> <hibernate-mapping> <class name="newlandframework.storm.model.RubbishUsers" table="rubbish_users" catalog="ccs"> <id name="userId" type="java.lang.Integer"> <column name="user_id"/> <generator class="assigned"/> </id> <property name="homeCity" type="java.lang.Integer"> <column name="home_city" not-null="true"/> </property> <property name="msisdn" type="java.lang.Integer"> <column name="msisdn" not-null="true"/> </property> <property name="smsContent" type="java.lang.String"> <column name="sms_content" not-null="true"/> </property> </class> </hibernate-mapping>
最後,還是通過Spring把hibernate集成起來,資料庫連接池用的是:DBCP。對應的Spring配置文件jdbc-hibernate-bean.xml的內容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd" default-autowire="byType" default-lazy-init="false"> <bean id="placeholder" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>newlandframework/storm/resource/jdbc.properties</value> </list> </property> </bean> <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${database.driverClassName}"/> <property name="url" value="${database.url}"/> <property name="username" value="${database.username}"/> <property name="password" value="${database.password}"/> <property name="maxActive" value="32"/> <property name="initialSize" value="1"/> <property name="maxWait" value="60000"/> <property name="maxIdle" value="32"/> <property name="minIdle" value="5"/> <property name="removeAbandoned" value="true"/> <property name="removeAbandonedTimeout" value="180"/> <property name="connectionProperties" value="bigStringTryClob=true;clientEncoding=GBK;defaultRowPrefetch=50;serverEncoding=ISO-8859-1"/> <property name="timeBetweenEvictionRunsMillis"> <value>60000</value> </property> <property name="minEvictableIdleTimeMillis"> <value>1800000</value> </property> </bean> <!-- hibernate session factory --> <bean id="sessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean"> <property name="dataSource" ref="dbcpDataSource"/> <property name="configLocation" value="newlandframework/storm/resource/hibernate.cfg.xml"/> <property name="eventListeners"> <map></map> </property> <property name="entityCacheStrategies"> <props></props> </property> <property name="collectionCacheStrategies"> <props></props> </property> <property name="configurationClass"> <value>org.hibernate.cfg.AnnotationConfiguration</value> </property> </bean> <bean id="hibernateTemplete" class="org.springframework.orm.hibernate3.HibernateTemplate"> <property name="sessionFactory" ref="sessionFactory"/> </bean> </beans>
到此為止,我們已經完成了敏感信息實時監控的所有的Storm組件的開發。現在,我們來完成Storm的拓撲(Topology),由於拓撲(Topology)又分為本地拓撲和分散式拓撲,因此封裝了一個工具類StormRunner(拓撲執行器),對應的代碼如下:
/** * @filename:StormRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:拓撲執行器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; public final class StormRunner { private static final int MILLIS_IN_SEC = 1000; // 本地拓撲 Storm用一個進程裡面的N個線程進行模擬 public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } // 分散式拓撲 真正的Storm集群運行環境 public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException { StormSubmitter.submitTopology(top