[flink] ProcessFunction应用示例
需求:对于DataStream[SensorReading]数据流,若某个sensor的温度在10秒之内连续上升则报警
class TempIncreWarning
(interval
: Long) extends KeyedProcessFunction
[String, SensorReading
, String]{
lazy val lastTempState
: ValueState
[Double] = getRuntimeContext
.getState
(new ValueStateDescriptor
[Double]("last-temp", classOf
[Double]))
lazy val timerTsState
: ValueState
[Long] = getRuntimeContext
.getState
(new ValueStateDescriptor
[Long]("timer-ts", classOf
[Long]))
override def processElement
(value
: SensorReading
, ctx
: KeyedProcessFunction
[String, SensorReading
, String]#Context
, out
: Collector
[String]): Unit = {
val lastTemp
= lastTempState
.value
val timerTs
= timerTsState
.value
lastTempState
.update
(value
.temperature
)
if (value
.temperature
> lastTemp
&& timerTs
== 0){
val ts
= ctx
.timerService
().currentProcessingTime
() + interval
ctx
.timerService
.registerProcessingTimeTimer
(ts
)
timerTsState
.update
(ts
)
} else if (value
.temperature
< lastTemp
){
ctx
.timerService
.deleteProcessingTimeTimer
(timerTs
)
timerTsState
.clear
()
}
}
override def onTimer
(timestamp
: Long, ctx
: KeyedProcessFunction
[String, SensorReading
, String]#OnTimer, out
: Collector
[String]): Unit = {
out
.collect
("传感器" + ctx
.getCurrentKey
+ "的温度连续" + interval
/1000 + "秒连续上升")
timerTsState
.clear
()
}
}