Flink-測試用的fake溫度感測器 Flink中,測試時,會用到自定義的source。 下為一例。。 該例使用溫度感測器的格式演示fake日誌數據源。 代碼用Scala寫的。 感測器... 感測器 - 樣例類 SensorReads.scala: x 1 package sr 2 3 /* ...
Flink-測試用的fake溫度感測器
Flink中,測試時,會用到自定義的source。
下為一例。。 該例使用溫度感測器的格式演示fake日誌數據源。
代碼用Scala寫的。
感測器...
-
感測器 - 樣例類
x 1SensorReads.scala
:package sr
2
3/**
4*
5*/
6case class SensorReads(id:String,
7timestap:Long,
8tempture:Double)
-
感測器 - 數據源模擬
46 1SnsorSrc_4096T.scala
:package sr
2
3import org.apache.flink.streaming.api.functions.source.SourceFunction
4import scala.util.Random
5
6/**
7* period, is 4096 millis.
8*/
9case class SnsorSrc_4096T extends SourceFunction[SensorReads] {
10 11var isInRunning: Boolean = true
12 13////
14override def run(sourceContext: SourceFunction.SourceContext[
15SensorReads]): Unit = {
16 17 18val rand: Random = new Random
19 20var tptNow4 =
21(1 to 4).map(
22"snsor_" + _.toString -> (23 + 16 * rand.nextGaussian))
23
24 25 26while (isInRunning) {
27tptNow4 = tptNow4.map(
28t => t._1 -> (t._2 + rand.nextGaussian))
29 30 31val timeStampNow: Long = System.currentTimeMillis
32 33tptNow4.foreach{
34t =>
35sourceContext.collect( // O.U.T
36SensorReads(t._1, timeStampNow, t._2) )
37Thread.sleep(512) }
38//not set, is stm
39 40Thread.sleep(2048) }
41 42}
43 44override def cancel(): Unit = isInRunning = false
45 46}
測試
SnsrSrcAappli.scala
:
package applis
2
3
import org.apache.flink.streaming.api.scala._
4
import sr._
5
6
object SnsrSrcAappli extends App{
7
val env = StreamExecutionEnvironment.getExecutionEnvironment
8
9
env.addSource(SnsorSrc_4096T() )
10
.print("aaa")
11
12
env.execute()
13
}
數據源模擬用case-class,此處使用則可以不寫new。
輸出
IDEA控制臺上run:
17 1log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
log4j:WARN Please initialize the log4j system properly.
3
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:3> SensorReads(snsor_1,1573556705508,30.383394411578916)
5
aaa:4> SensorReads(snsor_2,1573556705508,21.397405872448672)
6
aaa:5> SensorReads(snsor_3,1573556705508,20.598086139457727)
7
aaa:6> SensorReads(snsor_4,1573556705508,18.30066983735531)
8
aaa:7> SensorReads(snsor_1,1573556709627,30.120955223032546)
9
aaa:8> SensorReads(snsor_2,1573556709627,22.38746867201145)
10
aaa:1> SensorReads(snsor_3,1573556709627,20.45357507067989)
11
aaa:2> SensorReads(snsor_4,1573556709627,17.18467261133715)
12
aaa:3> SensorReads(snsor_1,1573556713729,31.686487593592904)
13
aaa:4> SensorReads(snsor_2,1573556713729,20.67106361911623)
14
aaa:5> SensorReads(snsor_3,1573556713729,21.27724215221553)
15
aaa:6> SensorReads(snsor_4,1573556713729,16.84273306583804)
16
17
Process finished with exit code -1
...
如果SnsorSrc_4096T.scala
中,「當前溫度」.foreach
這樣寫:
tptNow4.foreach{
2
t =>
3
sourceContext.collect( // O.U.T
4
SensorReads(t._1, System.currentTimeMillis, t._2) )
5
Thread.sleep(512) }
那麼結果就會是:
25 1log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
log4j:WARN Please initialize the log4j system properly.
3
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:5> SensorReads(snsor_1,1573561932216,20.427373784204445)