[flink] ProcessFunction应用示例

    科技2022-07-13  115

    [flink] ProcessFunction应用示例

    需求:对于DataStream[SensorReading]数据流,若某个sensor的温度在10秒之内连续上升则报警

    /** 需求: 对于DataStream[SensorReading]数据流,若某个sensor的温度在10秒之内连续上升则报警 * 分析: 保存两个状态,上一条数据的温度值(用来和本条数据比较),之前开始统计的定时器的时间戳 */ class TempIncreWarning(interval: Long) extends KeyedProcessFunction[String, SensorReading, String]{ // 定义状态,保存上一个温度值进行比较,保存注册定时器的时间戳用于删除定时器 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double])) lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long])) // KeyedProcessFunction的K,I,O泛型为ID(String),SensorReading,String(报警信息) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { // 先取出保存的状态 val lastTemp = lastTempState.value val timerTs = timerTsState.value lastTempState.update(value.temperature) // 当前温度值和上次温度值比较 if (value.temperature > lastTemp && timerTs == 0){ // 如果温度上升,且没有定时器,就注册当前时间10s之后的定时器 val ts = ctx.timerService().currentProcessingTime() + interval ctx.timerService.registerProcessingTimeTimer(ts) timerTsState.update(ts) } else if (value.temperature < lastTemp){ // 若温度下降,删除定时器 ctx.timerService.deleteProcessingTimeTimer(timerTs) timerTsState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimer, out: Collector[String]): Unit = { // 若能触发定时器,说明10s之内肯定没被删除,意味着这10s内的数据符合报警条件,直接输出就完事 // 利用out进行输出 out.collect("传感器" + ctx.getCurrentKey + "的温度连续" + interval/1000 + "秒连续上升") // 清空定时器 timerTsState.clear() } }
    Processed: 0.011, SQL: 8