[Flink]測試用的fake溫度感測器

来源:https://www.cnblogs.com/senwren/archive/2019/11/12/fake-snsr-Rd-src.html
-Advertisement-
Play Games

Flink-測試用的fake溫度感測器 Flink中,測試時,會用到自定義的source。 下為一例。。 該例使用溫度感測器的格式演示fake日誌數據源。 代碼用Scala寫的。 感測器... 感測器 - 樣例類 SensorReads.scala: ​x 1 package sr 2 ​ 3 /* ...


 

Flink-測試用的fake溫度感測器

 

Flink中,測試時,會用到自定義的source。

下為一例。。 該例使用溫度感測器的格式演示fake日誌數據源。

代碼用Scala寫的。

 

感測器...

 

  • 感測器 - 樣例類

    SensorReads.scala

          x         1
    package sr
    2
    3
    /**
    4
     * 
    5
     */
    6
    case class SensorReads(id:String,
    7
                           timestap:Long,
    8
                           tempture:Double)
       

 

  • 感測器 - 數據源模擬

    SnsorSrc_4096T.scala

          46         1
    package sr
    2
    3
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    4
    import scala.util.Random
    5
    6
    /**
    7
     * period, is 4096 millis.
    8
     */
    9
    case class SnsorSrc_4096T extends SourceFunction[SensorReads] {
    10
    
    
    
    
    11
    
    
        var isInRunning: Boolean = true
    12
    
    
    
    
    13
    
    
        ////
    14
        override def run(sourceContext: SourceFunction.SourceContext[
    15
                SensorReads]): Unit = {
    16
    
    
    
    
    17
    
    
    
    
    
    
    18
    
    
            val rand: Random = new Random
    19
    
    
    
    
    20
    
    
            var tptNow4 =
    21
                (1 to 4).map(
    22
                    "snsor_" + _.toString -> (23 + 16 * rand.nextGaussian))
    23
    24
    
    
    
    
    25
    
    
    
    
    
    
    26
    
    
            while (isInRunning) {
    27
                tptNow4 = tptNow4.map(
    28
                    t => t._1 -> (t._2 + rand.nextGaussian))
    29
    
    
    
    
    30
    
    
    
    
    
    
    31
    
    
                val timeStampNow: Long = System.currentTimeMillis
    32
    
    
    
    
    33
    
    
                tptNow4.foreach{
    34
                    t =>
    35
                        sourceContext.collect( // O.U.T
    36
                            SensorReads(t._1, timeStampNow, t._2) )
    37
                    Thread.sleep(512)  }
    38
                //not set, is stm
    39
    
    
    
    
    40
    
    
                Thread.sleep(2048)  }
    41
    
    
    
    
    42
    
    
        }
    43
    
    
    
    
    44
    
    
        override def cancel(): Unit = isInRunning = false
    45
    
    
    
    
    46
    
    
    }
       

 

 

測試

 

SnsrSrcAappli.scala

      13         1
package applis
2
3
import org.apache.flink.streaming.api.scala._
4
import sr._
5
6
object SnsrSrcAappli extends App{
7
    val env = StreamExecutionEnvironment.getExecutionEnvironment
8




9

    env.addSource(SnsorSrc_4096T() )
10
                    .print("aaa")
11




12

    env.execute()
13
}
   

 

數據源模擬用case-class,此處使用則可以不寫new。

 

輸出

 

IDEA控制臺上run:

      17         1
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
log4j:WARN Please initialize the log4j system properly.
3
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:3> SensorReads(snsor_1,1573556705508,30.383394411578916)
5
aaa:4> SensorReads(snsor_2,1573556705508,21.397405872448672)
6
aaa:5> SensorReads(snsor_3,1573556705508,20.598086139457727)
7
aaa:6> SensorReads(snsor_4,1573556705508,18.30066983735531)
8
aaa:7> SensorReads(snsor_1,1573556709627,30.120955223032546)
9
aaa:8> SensorReads(snsor_2,1573556709627,22.38746867201145)
10
aaa:1> SensorReads(snsor_3,1573556709627,20.45357507067989)
11
aaa:2> SensorReads(snsor_4,1573556709627,17.18467261133715)
12
aaa:3> SensorReads(snsor_1,1573556713729,31.686487593592904)
13
aaa:4> SensorReads(snsor_2,1573556713729,20.67106361911623)
14
aaa:5> SensorReads(snsor_3,1573556713729,21.27724215221553)
15
aaa:6> SensorReads(snsor_4,1573556713729,16.84273306583804)
16
17
Process finished with exit code -1
   

 

...

如果SnsorSrc_4096T.scala中,「當前溫度」.foreach這樣寫:

      5         1
tptNow4.foreach{
2
    t =>
3
        sourceContext.collect( // O.U.T
4
            SensorReads(t._1, System.currentTimeMillis, t._2) )
5
    Thread.sleep(512)  }
   

 

那麼結果就會是:

      25         1
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
log4j:WARN Please initialize the log4j system properly.
3
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:5> SensorReads(snsor_1,1573561932216,20.427373784204445)
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Linux下安裝zip解壓功能 Linux伺服器上一般預設沒是沒有有安裝zip命令 安裝zip指令 apt-get install zip 或 yum install zip 輸入zip OK linux安裝unzip命令:apt-get install unzip 或 yum install un ...
  • 1. configuer configure 是一個shell腳本,用來檢測安裝平臺是否符合要求,並通過配置參數生成makefile文件 2. make (make all 的簡寫) 編譯命令,從makefile文件中讀取命令,產生目標文件和可執行文件 3. make clean 清除編譯產生的可執 ...
  • ⽹絡時間服務和chrony 實驗練習: 1. 準備實驗環境: 2. 時間同步(centos6) 3. ntp軟體實現時間同步(centos6) centos6上預設安裝了ntp軟體包(包括客戶端和伺服器端),但是ntp同步需要⼀定時間才能完全同步時間的,⽽chrony同步時間⽐ntp快。centos ...
  • 故事背景:我們公司是做新零售的,需要對發佈的每台機器進行文件的同步更新,所以我這裡做了一個小小的調研 技術調研:linux之間同步文件有兩種方式rsync與scp。 sync和scp在文件夾均不存在時,執行時間相差不大,但是文件夾存在的情況下差異很大。原因是scp是複製:若mas2文件不存在則新建, ...
  • 環境:centos7 nginx1.16.1(源碼安裝) 一、下載nginx源碼包 地址:http://nginx.org/en/download.html Mainline version(主線版本)Stable version(穩定版本)Legacy versions(傳統老版本) 下載穩定版: ...
  • xcrun: error 在終端輸入 git clone *****後,提示: xcrun: error: invalid active developer path (/Library/Developer/CommandLineTools), missing*****,解決方法,直接在終端輸入以下 ...
  • 括弧的種類 小括弧,圓括弧 ( ) 中括弧,方括弧 [ ] 大括弧、花括弧 { } 一、單小括弧 () 1.另開命令組——小括弧中的命令將會新開啟一個子shell獨立順序運行,所以括弧中的變數不能夠被腳本餘下的部分使用。括弧中多個命令之間用分號隔開,最後一個命令不需要分號,各命令和括弧之間無空格。 ...
  • chrony軟體使用說明 chrony簡介 chrony是一個開源的自由軟體,它能保持系統時鐘與時間伺服器(ntp)同步,讓時間保持精確。 它由兩個程式組成:chrongd和chronyc。 chronyd是一個後臺運行的守護進程,用於調整內核運行的系統時鐘和時間伺服器同步。 它確定電腦增減時間的 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...