業務描述: 統計從kafka spout中讀取的數據條數,以及寫入redis的數據的條數,寫入hdfs的數據條數,寫入kafaka的數據條數。並且每過5秒將數據按照json文件的形式寫入日誌。其中保存為json數據的格式為:時間戳 + 進程名稱 + 讀kafka數據條數 + 寫入redis數據條數 ...
業務描述:
統計從kafka spout中讀取的數據條數,以及寫入redis的數據的條數,寫入hdfs的數據條數,寫入kafaka的數據條數。並且每過5秒將數據按照json文件的形式寫入日誌。其中保存為json數據的格式為:時間戳 + 進程名稱 + 讀kafka數據條數 + 寫入redis數據條數 + 寫入hbase條數 + 寫入kafka條數。time_stamp + process_name + from_kafka + to_redis + to_hdfs + to_kafka
給出實現的關鍵代碼:
package count; import java.io.FileWriter; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import bolt.addGridNo; import bolt.transGPS; /* * 寫在main函數裡面了,因此只是跑在nimbus上面 * 這樣做是不對的!!! */ public class countflow{ private static String fileName = "/home/storm/countflow";//定義寫文件路徑 static FileWriter writer = null;//文件讀寫流 //private static Timer timer = new Timer();//計時器,每過5秒鐘進行寫數據 // private static countflow uniqueInstance; // private countflow(){} // public static countflow getInstance(){ // if(uniqueInstance == null){ // uniqueInstance = new countflow(); // } // return uniqueInstance; // } /* public void run() { try { writer = new FileWriter(fileName, true); } catch (IOException e) { e.printStackTrace(); } executeFixedRate(); /*Timer timer = new Timer(); timer.schedule(new count(writer), 0, 5000);//每過5秒調用新建一個count類並且將writer傳入。 } */ /** * 以固定周期頻率執行任務 * @throws IOException */ public static void executeFixedRate() throws IOException { writer = new FileWriter(fileName, true); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate( new count(writer), 0, 5000, TimeUnit.MILLISECONDS); } static class count implements Runnable{ private FileWriter writer = null; //設置日期格式 private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); count(FileWriter writer){//構造函數 this.writer = writer; } public void run(){//運行代碼 try { writer.write("Bolt"+addGridNo.indexId+" From Grid and GPS "+"<"+df.format(new Date())+">strom_flow,<"+addGridNo.countGrid()+">,<"+transGPS.countGPS()+">\n"); writer.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
某一個需要統計的bolt中的代碼
package bolt; import java.io.FileWriter; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import topology.topo; /** * @author ZPF * */ public class addGridNo extends BaseRichBolt { private static final long serialVersionUID = -6586283337287975719L; public static int numOfGrid = 0; private static FileWriter writer = null; private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static String fileName = "/home/storm/testflowgrid";//定義寫文件路徑 private OutputCollector collector; public static int countGrid(){ return numOfGrid; } @Override public void prepare(Map config, TopologyContext context, OutputCollector collector) { this. collector = collector; } @Override public void execute(Tuple tuple) { topo.numOfGrid++; numOfGrid++; String line = tuple.getString(0); synchronized (collector){ collector.emit(new Values(line.toString())); } synchronized (collector){ collector.ack(tuple); } synchronized (collector){ collector.fail(tuple); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("GPSWithGridNo")); } }
最後在topo的main函數中需要bulid一個topology。然後設置該topology的屬性,以及指定讀取數據的路徑,數據採用何種分發方式,topology的併發數目為多少等相關設置。
另外一種統計的方法
package bolt; import java.io.FileWriter; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import topology.topo; import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * * 實現日誌編寫 * author ZPF * */ public class transGPS extends BaseRichBolt{ private static final long serialVersionUID = -5653803832498574866L; public static int numOfGPS = 0;//統計計算GPS的次數 numOfGPS private static FileWriter writer = null; private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static String fileName = "/home/storm/countflow";//定義寫文件路徑 private OutputCollector collector; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this. collector = collector; } /* * 返回統計次數 */ public static int countGPS(){ return numOfGPS; } @Override public void execute(Tuple tuple) { addGridNo.numOfGrid = topo.numOfGrid; numOfGPS = topo.numOfGPS; try { if(isTickTuple(tuple)){ writer = new FileWriter(fileName, true); String str = null; writer.write("{\"time_stamp\":\"" +df.format(new Date())+ "\",\"process_name\":\"" + "strom_flow" + "\",\"from_kafka:"+addGridNo.numOfGrid +"\",\"to_redis:"+topo.numOfGPS+"}\n"); //writer.write("Grid and GPS "+df.format(new Date())+",strom_flow,"+topo.numOfGrid+","+topo.numOfGPS+"\n"); writer.flush(); } else{ numOfGPS++; topo.numOfGPS++; String line = tuple.getString(0);//json格式 synchronized (collector){ collector.emit(new Values(line.toString())); } synchronized (collector){ collector.ack(tuple); } synchronized (collector){ collector.fail(tuple); } } } catch (IOException e1) { e1.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("GPS02")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//每5s持久化一次數據 return conf; } // @Override // public void cleanup() { // // // TODO Auto-generated method stub // // try { // writer = new FileWriter(fileName, true); // String str = null; // writer.write("{\"time_stamp\":\"" +df.format(new Date())+ "\",\"process_name\":\"" + "strom_flow" // + "\",\"from_kafka\":"+topo.numOfGrid // +"\",\"to_redis\":"+topo.numOfGPS+"}\n"); // //writer.write("Grid and GPS "+df.format(new Date())+",strom_flow,"+topo.numOfGrid+","+topo.numOfGPS+"\n"); // writer.flush(); // addGridNo.numOfGrid = topo.numOfGrid; // numOfGPS = topo.numOfGPS; // // } catch (IOException e) { // // // TODO Auto-generated catch block // // e.printStackTrace(); // // } // // } public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } }
出現的問題:
日誌的多次重寫以及時間戳的延時問題。
分析已知結果:
已知的問題以及出現的結果:
- numOfGrid以及numOfGPS以及其他的變數目前都是聲明為靜態私有變數,這裡numOfGrid和numOfGPS無論聲明在bolt中還是topo中 都是可以正確得到答案。
- 目前使用的統計方法。為了實現5秒向文件寫一次數據。現在大致可以分為兩種方法。一種是在提交topo之後開啟一個線程進行統計和寫入讀入的數據條數。另一種方法是使用靜態的函數,函數在bolt被調用。
- 每一個bolt都可以產生多個task,這是從網上摘過來的,因此在一個bolt的prepare中 運行的代碼其實不只是運行了一次,到底是每一個task都運行了這個prepare還是在每一臺機器上都運行了這個prepare程式?如果運行次數不止一次的話,那麼 向文件中寫入的線程或者靜態程式就不僅僅開啟或者調用了一次,那麼就可以解釋了為什麼產生了文件的多次讀寫操作!!!
- 當一臺機器出現故障,經常的是storm3崩潰了。那麼最終的統計結果肯定出現問題。至於結果是否可以簡單地相加,現在還是不清楚。為什麼不清楚能不能簡單的相加呢?其一是不知道線程究竟開啟了多少個或者靜態的統計函數究竟調用了多少次!另外如果簡單的將重覆的數據忽略的話,並且三台機器能夠正常運行,那麼最後的結果經過另外自己開啟的一個topic測試過,消費的數據和壓入的總數據的條數相等。但是一臺機器崩潰結果就不等於topic中的總數據條數!!
- 老師上課提到了線程安全問題,那麼根據網上的講解來看,將變數全部設置為全局靜態變數這肯定是有問題的。如果將代碼修改為可重入函數的話應該就可以解決文件重寫這個問題。那麼這個角度來看,前提是線程開啟的不止一個,而是多個。但是,如果不將變數設置為全局變數的話,只是為局部變數又需要怎麼修改來實時統計處理數據的條數呢。或者只是在topo中設置全局變數,每次將局部變數傳給外部,但是這還是有全局變數啊。
- 查看手冊調用數量!!!
emitted欄顯示的數字表示的是調用OutputCollector的emit方法的次數.
transferred欄顯示的數字表示的是實際tuple發送到下一個task的計數.
如果一個bolt A使用all group的方式(每一個bolt都要接收到)向bolt B發射tuple, 此時bolt B啟動了5個task, 那麼trasferred顯示的數量將是emitted的5倍.
如果一個bolt A內部執行了emit操作, 但是沒有指定tuple的接受者, 那麼transferred將為0.
另外collector.emit(new Values(xxx))和collector.emit(tuple, new Values(xxx)) 這兩種不同的emit方法也會影響後面bolt的emitted和transferred, 如果是前者, 則後續bolt的這兩個值都是0, 因為前一個emit方法是非安全的, 不再使用acker來進行校驗.
分析造成該結果的原因:
Storm與傳統關係型資料庫
傳統關係型資料庫是先存後計算,而storm則是先算後存,甚至不存
傳統關係型資料庫很難部署實時計算,只能部署定時任務統計分析視窗數據
關係型資料庫重視事務,併發控制,相對來說Storm比較簡陋
Storm不Hadoop,Spark等是流行的大數據方案
與Storm關係密切的語言:核心代碼用clojure書寫,實用程式用python開發,使用java開發拓撲
來自 <http://www.open-open.com/lib/view/open1430095563146.html>
Storm集群中有兩種節點,一種是控制節點(Nimbus節點),另一種是工作節點(Supervisor節點)。所有Topology任務的提交必須在Storm客戶端節點上進行(需要配置 storm.yaml文件),由Nimbus節點分配給其他Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分成一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集群上,Supervisor會去zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。
和同樣是計算框架的MapReduce相比,MapReduce集群上運行的是Job,而Storm集群上運行的是Topology。但是Job在運行結束之後會自行結束,Topology卻只能被手動的kill掉,否則會一直運行下去
Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在記憶體里,也可以每次都更新資料庫,也可以採用NoSQL存儲。這部分事情完全交給用戶。
數據存儲之後的展現,也是你需要自己處理的,storm UI 只提供對topology的監控和統計。
總體的Topology處理流程圖為:
來自 <http://www.open-open.com/lib/view/open1430095563146.html>
Bolt類接收由Spout或者其他上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現可以通過繼承BasicRichBolt類或者IRichBolt介面等來完成
prepare方法 -- 此方法和Spout中的open方法類似,在集群中一個worker中的task初始化時調用。 它提供了bolt執行的環境
declareOutputFields方法 -- 用於聲明當前Bolt發送的Tuple中包含的欄位(field),和Spout中類似
cleanup方法 -- 同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。
execute方法 -- 這是Bolt中最關鍵的一個方法,對於Tuple的處理都可以放到此方法中進行。具體的發送是通過emit方法來完成的。execute接受一個 tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
來自 <http://www.open-open.com/lib/view/open1430095563146.html>
嘗試解決文件多次重寫:
1.由於程式運行在三台不同的機器上,在進行多線程操作時,程式是否是安全的很關鍵!
使得rand函數變為線程安全的唯一方式是重寫它,使得它不再使用任何靜態數據,取而代之地依靠調用者在參數中傳遞狀態信息。這樣的缺點是,程式員現在要被迫改變調用程式的代碼。
來自 <http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html>
該問題的詳細描述:http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html
某些函數(如gethostbyname)將計算結果放在靜態結構中,並返回一個指向這個結構的指針。如果我們從併發線程中調用這些函數,那麼將可能發生災難,因為正在被一個線程使用的結果會被另一個線程悄悄地覆蓋了。
有兩種方法來處理這類線程不安全函數。一種是選擇重寫函數,使得調用者傳遞存放結果的結構地址。這就消除了所有共用數據,但是它要求程式員還要改寫調用者的代碼。
如果線程不安全函數是難以修改或不可修改的(例如,它是從一個庫中鏈接過來的),那麼另外一種選擇就是使用lock-and-copy(加鎖-拷貝)技術。這個概念將線程不安全函數與互斥鎖聯繫起來。在每個調用位置,對互斥鎖加鎖,調用函數不安全函數,動態地為結果非配存儲器,拷貝函數返回的結果到這個存儲器位置,然後對互斥鎖解鎖。一個吸引人的變化是定義了一個線程安全的封裝(wrapper)函數,它執行lock-and-copy,然後調用這個封轉函數來取代所有線程不安全的函數。
線程安全:一個函數被稱為線程安全的(thread-safe),當且僅當被多個併發進程反覆調用時,它會一直產生正確的結果。如果一個函數不是線程安全的,我們就說它是線程不安全的(thread-unsafe)。我們定義四類(有相交的)線程不安全函數。
第1類:不保護共用變數的函數
第2類:保持跨越多個調用的狀態函數
第3類:返回指向靜態變數指針的函數
第4類:調用線程不安全函數的函數
可重入函數
可重入函數:可重入函數是線程安全函數的一種,其特點在於它們被多個線程調用時,不會引用任何共用數據。可重入函數通常要比不可重入的線程安全函數效率高一些,因為它們不需要同步操作。更進一步說,將第2類線程不安全函數轉化為線程安全函數的唯一方法就是重寫它,使之可重入。
來自 <http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html>
2.也許將寫文件的函數寫為可重入函數可能會解決問題!!!
顯式可重入函數:如果所有函數的參數都是傳值傳遞的(沒有指針),並且所有的數據引用都是本地的自動棧變數(也就是說沒有引用靜態或全局變數),那麼函數就是顯示可重入的,也就是說不管如何調用,我們都可斷言它是可重入的。
隱式可重入函數:可重入函數中的一些參數是引用傳遞(使用了指針),也就是說,在調用線程小心地傳遞指向非共用數據的指針時,它才是可重入的。例如rand_r就是隱式可重入的。
我們使用可重入(reentrant)來包括顯式可重入函數和隱式可重入函數。然而,可重入性有時是調用者和被調用者共有的屬性,並不只是被調用者單獨的屬性3002
3.根據task的ID進行實時計算,每次統計每一個task處理的數據,然後將task的統計結果發送給外部的統計函數,可能解決重寫問題!!!
當數據量大到一定程度時就要使用併發,當併發需要考慮容錯與事務性時處理邏輯又會變得複雜起來。在Storm中,每個bolt可以啟動多個task,每一個task會有一個唯一的task ID。當需要持久化操作時,每個task必須把自己的中間狀態連帶自己的task ID一起持久化下來,而在故障恢復時,每個task只從資料庫中讀取屬於自己的狀態數據,否則很容易導致記憶體溢出。再加上有些業務邏輯要求多個task的數據必須在資料庫中一起commit,這又增加了複雜性。
來自 <http://blog.sina.com.cn/s/blog_6ff05a2c0101ficp.html>
但是這裡面臨著 問題:task ID是變化著的,如果某次程式崩潰,重啟之後發生錯誤。
如果在使用併發時想動態地調整併發數,那需要增加很多額外的處理邏輯。因為Storm預設的fieldsGrouping是根據併發數進行Hash計算取模。如果併發數變動,那麼每個數據流應該分配到哪個task中也就發生了變動。在故障恢復時,如果併發數發生了變化,每個task的task ID也會發生變化,這會導致一個task從資料庫中讀取不到本來屬於自己的那部分中間狀態數據。這時需要採用一致性Hash策略來解決該問題。
來自 <http://blog.sina.com.cn/s/blog_6ff05a2c0101ficp.html>
但是根據上面提出的各種方案,經過嘗試都失敗了!
找出為何失敗以及文件重覆讀寫是否真的是個錯誤。
首先,解釋文件重寫出現的原因:
無論是使用線程還是使用靜態的方法都是需要再bolt中的prepare函數中進行調用。根據上述對storm的運行以及結構分析,可以得到在分散式系統上的運行並不只是一臺機器上簡單的日誌統計而言。原因就在於storm採用分散式系統進行數據的處理操作。那麼函數的調用次數以及線程的開啟個數一定不會等於1。這一點需要十分註意!!!分散式系統並不是在一臺機器上跑,而且分散式系統在此而言是相對獨立的。而且我們自己無法提前將任務經行分配,比如說這一臺機器跑幾個,另一臺機器跑哪些。所以出現文件的重覆寫入是難以避免的,而且是正常的。
通過上面的分析,以及對storm的結構分析。這裡已經可以確定之前的判斷是不對的。對於起初懷疑文件重寫是個錯誤的假設已經被證實是不對的。那麼除此之外,在代碼書寫過程中,還有幾點需要註意。
接下來需要解決的問題:
除了之前錯誤認為的文件重寫之外,存在的另外一個問題就是時間戳的延遲問題。舉一些實際運行得到的結果:
{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}
上面的運行結果是在設置每5秒進行寫入文件。但是時間戳這裡產生了問題。說明重覆寫入過程中存在嚴重的延時。那麼接下來的工作除了合併重寫的數據之外還要降低延時。
接下來就是合併重覆數據,以及降低延時的處理了。
分散式系統的確不好考慮,問題各種各樣的。