Storm中遇到的日誌多次重寫問題(一)

来源:http://www.cnblogs.com/zpfbuaa/archive/2016/10/18/5974000.html
-Advertisement-
Play Games

業務描述: 統計從kafka spout中讀取的數據條數,以及寫入redis的數據的條數,寫入hdfs的數據條數,寫入kafaka的數據條數。並且每過5秒將數據按照json文件的形式寫入日誌。其中保存為json數據的格式為:時間戳 + 進程名稱 + 讀kafka數據條數 + 寫入redis數據條數 ...


業務描述:

  統計從kafka spout中讀取的數據條數,以及寫入redis的數據的條數,寫入hdfs的數據條數,寫入kafaka的數據條數。並且每過5秒將數據按照json文件的形式寫入日誌。其中保存為json數據的格式為:時間戳 + 進程名稱 + 讀kafka數據條數 + 寫入redis數據條數 + 寫入hbase條數 + 寫入kafka條數。time_stamp + process_name + from_kafka + to_redis + to_hdfs + to_kafka

給出實現的關鍵代碼:

  

package count;


import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import bolt.addGridNo;
import bolt.transGPS;
/*
 * 寫在main函數裡面了,因此只是跑在nimbus上面
 * 這樣做是不對的!!!
 */
public class countflow{
    
    private static String fileName = "/home/storm/countflow";//定義寫文件路徑
    static FileWriter writer = null;//文件讀寫流
    //private static Timer timer = new Timer();//計時器,每過5秒鐘進行寫數據
//    private static countflow uniqueInstance;
//    private countflow(){}
//    public static countflow getInstance(){
//        if(uniqueInstance == null){
//            uniqueInstance = new countflow();
//        }
//        return uniqueInstance;
//    }
    /*
    public void run() {
        try {
            
            writer = new FileWriter(fileName, true);
    
        } catch (IOException e) {
            e.printStackTrace();
        }
        executeFixedRate();
        /*Timer timer = new Timer();
        timer.schedule(new count(writer), 0, 5000);//每過5秒調用新建一個count類並且將writer傳入。
    }
    */
    /**
     * 以固定周期頻率執行任務
     * @throws IOException 
     */
    public static void executeFixedRate() throws IOException {
        writer = new FileWriter(fileName, true);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(
                new count(writer),
                0,
                5000,
                TimeUnit.MILLISECONDS);
    }
    static class count implements Runnable{
        
        private FileWriter writer = null;
        //設置日期格式
        private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        
        count(FileWriter writer){//構造函數
            this.writer = writer;
        }
        public void run(){//運行代碼   
            try {
                writer.write("Bolt"+addGridNo.indexId+" From Grid and GPS "+"<"+df.format(new Date())+">strom_flow,<"+addGridNo.countGrid()+">,<"+transGPS.countGPS()+">\n");
                writer.flush();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }   
    }
}

某一個需要統計的bolt中的代碼

  

package bolt;



import java.io.FileWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import topology.topo;

/**
 * @author ZPF
 *
 */

public class addGridNo extends BaseRichBolt {
    private static final long serialVersionUID = -6586283337287975719L;
    public static int numOfGrid = 0;
    private static FileWriter writer = null;
    
    private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    private static String fileName = "/home/storm/testflowgrid";//定義寫文件路徑
    private OutputCollector collector;
    
    public static int countGrid(){
        return numOfGrid;
    }
    
    @Override
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {       
        this. collector = collector;
      }
    @Override
    public void execute(Tuple tuple) {
        topo.numOfGrid++;
        numOfGrid++;
        String line = tuple.getString(0);    
        
        synchronized (collector){ 
            collector.emit(new Values(line.toString()));
        }    
        synchronized (collector){  
            collector.ack(tuple); 
        }
        synchronized (collector){ 
            collector.fail(tuple);
        }
        
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("GPSWithGridNo"));
    }
}

 

最後在topo的main函數中需要bulid一個topology。然後設置該topology的屬性,以及指定讀取數據的路徑,數據採用何種分發方式,topology的併發數目為多少等相關設置。

另外一種統計的方法

  

package bolt;

import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import topology.topo;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 
 * 實現日誌編寫
 * author ZPF
 * 
 */

public class transGPS extends BaseRichBolt{
    
    private static final long serialVersionUID = -5653803832498574866L;
    
    public static int numOfGPS = 0;//統計計算GPS的次數 numOfGPS
    
    private static FileWriter writer = null;
    
    private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    private static String fileName = "/home/storm/countflow";//定義寫文件路徑

    
    private OutputCollector collector;  
     public void prepare(Map config, TopologyContext context, OutputCollector collector) {  
         this. collector = collector;  
      }
     /*
      * 返回統計次數
      */
    public static int countGPS(){
        return numOfGPS;
    }

    @Override
    public void execute(Tuple tuple) {
        addGridNo.numOfGrid = topo.numOfGrid;
        numOfGPS = topo.numOfGPS;
        try {
            if(isTickTuple(tuple)){
                writer = new FileWriter(fileName, true);
                String str = null;
                writer.write("{\"time_stamp\":\"" +df.format(new Date())+ "\",\"process_name\":\"" + "strom_flow" 
                                + "\",\"from_kafka:"+addGridNo.numOfGrid
                                +"\",\"to_redis:"+topo.numOfGPS+"}\n");
                //writer.write("Grid and GPS "+df.format(new Date())+",strom_flow,"+topo.numOfGrid+","+topo.numOfGPS+"\n");
                writer.flush();
            }
            else{
                numOfGPS++;
                topo.numOfGPS++;
                String line = tuple.getString(0);//json格式
                    synchronized (collector){
                        collector.emit(new Values(line.toString()));
                    }    
                    synchronized (collector){  
                        collector.ack(tuple);  
                    }
                    synchronized (collector){  
                        collector.fail(tuple);  
                    }
            }
        }
    catch (IOException e1) {
            e1.printStackTrace();
        }
        
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("GPS02"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//每5s持久化一次數據
        return conf;
    }
//    @Override
//    public void cleanup() {
//
//        // TODO Auto-generated method stub
//
//        try {
//            writer = new FileWriter(fileName, true);
//            String str = null;
//            writer.write("{\"time_stamp\":\"" +df.format(new Date())+ "\",\"process_name\":\"" + "strom_flow" 
//                            + "\",\"from_kafka\":"+topo.numOfGrid
//                            +"\",\"to_redis\":"+topo.numOfGPS+"}\n");
//            //writer.write("Grid and GPS "+df.format(new Date())+",strom_flow,"+topo.numOfGrid+","+topo.numOfGPS+"\n");
//            writer.flush();
//            addGridNo.numOfGrid = topo.numOfGrid;
//            numOfGPS = topo.numOfGPS;
//
//        } catch (IOException e) {
//
//            // TODO Auto-generated catch block
//
//            e.printStackTrace();
//
//        }
//
//    }
    public static boolean isTickTuple(Tuple tuple) {
        
        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
                && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
        }    
    }

 

 

 

出現的問題:

  日誌的多次重寫以及時間戳的延時問題。

分析已知結果:

  已知的問題以及出現的結果:

  1. numOfGrid以及numOfGPS以及其他的變數目前都是聲明為靜態私有變數,這裡numOfGrid和numOfGPS無論聲明在bolt中還是topo中 都是可以正確得到答案。
  2. 目前使用的統計方法。為了實現5秒向文件寫一次數據。現在大致可以分為兩種方法。一種是在提交topo之後開啟一個線程進行統計和寫入讀入的數據條數。另一種方法是使用靜態的函數,函數在bolt被調用。
  3. 每一個bolt都可以產生多個task,這是從網上摘過來的,因此在一個bolt的prepare中 運行的代碼其實不只是運行了一次,到底是每一個task都運行了這個prepare還是在每一臺機器上都運行了這個prepare程式?如果運行次數不止一次的話,那麼 向文件中寫入的線程或者靜態程式就不僅僅開啟或者調用了一次,那麼就可以解釋了為什麼產生了文件的多次讀寫操作!!!
  4. 當一臺機器出現故障,經常的是storm3崩潰了。那麼最終的統計結果肯定出現問題。至於結果是否可以簡單地相加,現在還是不清楚。為什麼不清楚能不能簡單的相加呢?其一是不知道線程究竟開啟了多少個或者靜態的統計函數究竟調用了多少次!另外如果簡單的將重覆的數據忽略的話,並且三台機器能夠正常運行,那麼最後的結果經過另外自己開啟的一個topic測試過,消費的數據和壓入的總數據的條數相等。但是一臺機器崩潰結果就不等於topic中的總數據條數!!
  5. 老師上課提到了線程安全問題,那麼根據網上的講解來看,將變數全部設置為全局靜態變數這肯定是有問題的。如果將代碼修改為可重入函數的話應該就可以解決文件重寫這個問題。那麼這個角度來看,前提是線程開啟的不止一個,而是多個。但是,如果不將變數設置為全局變數的話,只是為局部變數又需要怎麼修改來實時統計處理數據的條數呢。或者只是在topo中設置全局變數,每次將局部變數傳給外部,但是這還是有全局變數啊。
  6. 查看手冊調用數量!!!

  emitted欄顯示的數字表示的是調用OutputCollector的emit方法的次數.

  transferred欄顯示的數字表示的是實際tuple發送到下一個task的計數. 

  如果一個bolt A使用all group的方式(每一個bolt都要接收到)向bolt B發射tuple, 此時bolt B啟動了5個task, 那麼trasferred顯示的數量將是emitted的5倍. 

  如果一個bolt A內部執行了emit操作, 但是沒有指定tuple的接受者, 那麼transferred將為0.

  另外collector.emit(new Values(xxx))和collector.emit(tuple, new Values(xxx)) 這兩種不同的emit方法也會影響後面bolt的emitted和transferred, 如果是前者, 則後續bolt的這兩個值都是0, 因為前一個emit方法是非安全的, 不再使用acker來進行校驗.

分析造成該結果的原因:

   clip_image001

  clip_image002

    clip_image003

Storm與傳統關係型資料庫 

    傳統關係型資料庫是先存後計算,而storm則是先算後存,甚至不存 

    傳統關係型資料庫很難部署實時計算,只能部署定時任務統計分析視窗數據 

    關係型資料庫重視事務,併發控制,相對來說Storm比較簡陋 

    Storm不Hadoop,Spark等是流行的大數據方案 

    與Storm關係密切的語言:核心代碼用clojure書寫,實用程式用python開發,使用java開發拓撲 

  來自 <http://www.open-open.com/lib/view/open1430095563146.html>

  Storm集群中有兩種節點,一種是控制節點(Nimbus節點),另一種是工作節點(Supervisor節點)。所有Topology任務的提交必須在Storm客戶端節點上進行(需要配置 storm.yaml文件),由Nimbus節點分配給其他Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分成一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集群上,Supervisor會去zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。 

和同樣是計算框架的MapReduce相比,MapReduce集群上運行的是Job,而Storm集群上運行的是Topology。但是Job在運行結束之後會自行結束,Topology卻只能被手動的kill掉,否則會一直運行下去

Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在記憶體里,也可以每次都更新資料庫,也可以採用NoSQL存儲。這部分事情完全交給用戶。

    數據存儲之後的展現,也是你需要自己處理的,storm UI 只提供對topology的監控和統計。 

    總體的Topology處理流程圖為: 

clip_image004

來自 <http://www.open-open.com/lib/view/open1430095563146.html>

clip_image005

    Bolt類接收由Spout或者其他上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現可以通過繼承BasicRichBolt類或者IRichBolt介面等來完成 

    prepare方法 -- 此方法和Spout中的open方法類似,在集群中一個worker中的task初始化時調用。 它提供了bolt執行的環境

    declareOutputFields方法 -- 用於聲明當前Bolt發送的Tuple中包含的欄位(field),和Spout中類似 

    cleanup方法 -- 同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。 

    execute方法 -- 這是Bolt中最關鍵的一個方法,對於Tuple的處理都可以放到此方法中進行。具體的發送是通過emit方法來完成的。execute接受一個 tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。

來自 <http://www.open-open.com/lib/view/open1430095563146.html>

嘗試解決文件多次重寫:

  1.由於程式運行在三台不同的機器上,在進行多線程操作時,程式是否是安全的很關鍵!

使得rand函數變為線程安全的唯一方式是重寫它,使得它不再使用任何靜態數據,取而代之地依靠調用者在參數中傳遞狀態信息。這樣的缺點是,程式員現在要被迫改變調用程式的代碼。

  來自 <http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html>

  該問題的詳細描述:http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html

  某些函數(如gethostbyname)將計算結果放在靜態結構中,並返回一個指向這個結構的指針。如果我們從併發線程中調用這些函數,那麼將可能發生災難,因為正在被一個線程使用的結果會被另一個線程悄悄地覆蓋了。

  有兩種方法來處理這類線程不安全函數。一種是選擇重寫函數,使得調用者傳遞存放結果的結構地址。這就消除了所有共用數據,但是它要求程式員還要改寫調用者的代碼。

  如果線程不安全函數是難以修改或不可修改的(例如,它是從一個庫中鏈接過來的),那麼另外一種選擇就是使用lock-and-copy(加鎖-拷貝)技術。這個概念將線程不安全函數與互斥鎖聯繫起來。在每個調用位置,對互斥鎖加鎖,調用函數不安全函數,動態地為結果非配存儲器,拷貝函數返回的結果到這個存儲器位置,然後對互斥鎖解鎖。一個吸引人的變化是定義了一個線程安全的封裝(wrapper)函數,它執行lock-and-copy,然後調用這個封轉函數來取代所有線程不安全的函數。

  線程安全:一個函數被稱為線程安全的(thread-safe),當且僅當被多個併發進程反覆調用時,它會一直產生正確的結果。如果一個函數不是線程安全的,我們就說它是線程不安全的(thread-unsafe)。我們定義四類(有相交的)線程不安全函數。

  第1類:不保護共用變數的函數

  第2類:保持跨越多個調用的狀態函數

  第3類:返回指向靜態變數指針的函數

  第4類:調用線程不安全函數的函數

  可重入函數

  可重入函數:可重入函數是線程安全函數的一種,其特點在於它們被多個線程調用時,不會引用任何共用數據。可重入函數通常要比不可重入的線程安全函數效率高一些,因為它們不需要同步操作。更進一步說,將第2類線程不安全函數轉化為線程安全函數的唯一方法就是重寫它,使之可重入。

來自 <http://www.cnblogs.com/xiangshancuizhu/archive/2012/10/22/2734497.html>

2.也許將寫文件的函數寫為可重入函數可能會解決問題!!!

顯式可重入函數:如果所有函數的參數都是傳值傳遞的(沒有指針),並且所有的數據引用都是本地的自動棧變數(也就是說沒有引用靜態或全局變數),那麼函數就是顯示可重入的,也就是說不管如何調用,我們都可斷言它是可重入的。

隱式可重入函數:可重入函數中的一些參數是引用傳遞(使用了指針),也就是說,在調用線程小心地傳遞指向非共用數據的指針時,它才是可重入的。例如rand_r就是隱式可重入的。

我們使用可重入(reentrant)來包括顯式可重入函數和隱式可重入函數。然而,可重入性有時是調用者和被調用者共有的屬性,並不只是被調用者單獨的屬性3002

3.根據task的ID進行實時計算,每次統計每一個task處理的數據,然後將task的統計結果發送給外部的統計函數,可能解決重寫問題!!!

當數據量大到一定程度時就要使用併發,當併發需要考慮容錯與事務性時處理邏輯又會變得複雜起來。在Storm中,每個bolt可以啟動多個task,每一個task會有一個唯一的task ID。當需要持久化操作時,每個task必須把自己的中間狀態連帶自己的task ID一起持久化下來,而在故障恢復時,每個task只從資料庫中讀取屬於自己的狀態數據,否則很容易導致記憶體溢出。再加上有些業務邏輯要求多個task的數據必須在資料庫中一起commit,這又增加了複雜性。

來自 <http://blog.sina.com.cn/s/blog_6ff05a2c0101ficp.html>

但是這裡面臨著 問題:task ID是變化著的,如果某次程式崩潰,重啟之後發生錯誤。

如果在使用併發時想動態地調整併發數,那需要增加很多額外的處理邏輯。因為Storm預設的fieldsGrouping是根據併發數進行Hash計算取模。如果併發數變動,那麼每個數據流應該分配到哪個task中也就發生了變動。在故障恢復時,如果併發數發生了變化,每個task的task ID也會發生變化,這會導致一個task從資料庫中讀取不到本來屬於自己的那部分中間狀態數據。這時需要採用一致性Hash策略來解決該問題。

來自 <http://blog.sina.com.cn/s/blog_6ff05a2c0101ficp.html>

  但是根據上面提出的各種方案,經過嘗試都失敗了!

                              

找出為何失敗以及文件重覆讀寫是否真的是個錯誤。

  首先,解釋文件重寫出現的原因:

  無論是使用線程還是使用靜態的方法都是需要再bolt中的prepare函數中進行調用。根據上述對storm的運行以及結構分析,可以得到在分散式系統上的運行並不只是一臺機器上簡單的日誌統計而言。原因就在於storm採用分散式系統進行數據的處理操作。那麼函數的調用次數以及線程的開啟個數一定不會等於1。這一點需要十分註意!!!分散式系統並不是在一臺機器上跑,而且分散式系統在此而言是相對獨立的。而且我們自己無法提前將任務經行分配,比如說這一臺機器跑幾個,另一臺機器跑哪些。所以出現文件的重覆寫入是難以避免的,而且是正常的。

  通過上面的分析,以及對storm的結構分析。這裡已經可以確定之前的判斷是不對的。對於起初懷疑文件重寫是個錯誤的假設已經被證實是不對的。那麼除此之外,在代碼書寫過程中,還有幾點需要註意。

  接下來需要解決的問題:

除了之前錯誤認為的文件重寫之外,存在的另外一個問題就是時間戳的延遲問題。舉一些實際運行得到的結果:

{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:03","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:04","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:08","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:09","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1399","to_redis:503","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:13","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1401","to_redis:500","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1398","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

{"time_stamp":"2016-10-18 16:01:14","process_name":"strom_flow","from_kafka:1397","to_redis:502","to_hbase:120","to_hdfs:119","to_kafka:911}

上面的運行結果是在設置每5秒進行寫入文件。但是時間戳這裡產生了問題。說明重覆寫入過程中存在嚴重的延時。那麼接下來的工作除了合併重寫的數據之外還要降低延時。

 

接下來就是合併重覆數據,以及降低延時的處理了。

分散式系統的確不好考慮,問題各種各樣的。

                       


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • MySQL 字元串截取函數:left(), right(), substring(), substring_index()。還有 mid(), substr()。其中,mid(), substr() 等價於 substring() 函數,substring() 的功能非常強大和靈活。 1. 字元串截 ...
  • 前言:真正用到mysql是在公司的第二個項目下,具體的一些在之前的博客文章(http://www.cnblogs.com/zhengzeze/p/5623440.html)中也提到了,其中涉及到,免安裝版的mysql的配置問題,之前是從網上博客中學習的。現在把它貼出來,免得以後找不到了。具體的文章轉 ...
  • 數據作為信息的載體,要分析數據中包含的主要信息,即要分析數據的主要特征(即數據的數字特征), 對於數據的數字特征, 包含數據的集中位置、分散程度和數據分佈,常用統計項目如下: 集中趨勢統計量: 均值(Mean)、中位數(Median)、眾數(Mode)、百分位數 離散趨勢統計量:標準差(sd)、方差 ...
  • 前幾日安裝sql server2008r2 的時候碰到這個問題: 出現以下錯誤: SQL Server 安裝程式在運行 Windows Installer 文件時遇到錯誤。 Windows Installer 錯誤消息: 打開安裝日誌文件的錯誤。請驗證指定的日誌文件位置是否存在,是否可以寫入。 Wi ...
  • 介紹 本篇文章主要介紹在oracle中怎樣使用語句創建用戶,如果你是資料庫運維人員那麼這是必須掌握的,順便提一下在oracle中資料庫的概念它和其它資料庫系統比如mysql和sqlserver不一樣,在oracle中可以將用戶理解成其它的資料庫系統中的資料庫的概念,oracle中只有一個全局資料庫並 ...
  • 在項目中一般需要對一些數據進行處理,以下提供一些基本的SQL語句: 1.基於條件的插入和修改:需要在表中插入一條記錄,插入前根據key標識判斷。如果標識符不存在,則插入新紀錄,如果標識符存在,則根據語句中所給的新值對原紀錄中的欄位進行更新: 2.在分組和集合里統計分組:當需要統計一個分組裡的成員,或 ...
  • 一、關於用戶 Oracle安裝會自動的生產sys用戶和system用戶: 1. sys用戶是超級用戶,具有最高許可權,具有sysdba角色,有create database的許可權,該用戶的預設密碼是change_on_install 。 2. system用戶是管理操作員,許可權也很大,具有sysope ...
  • 有個經典的題目:1-100之間的數字(不重覆)存放在表裡,共95行一列,但是裡面缺了5個數字,怎麼用SQL最快找出那五個數字。 我們先來看看Oracle資料庫如何實現,如下所示,我們先準備測試環境和數據。 SQL> create table t( id number(10)); Table crea... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...