class SplitTempProcessor(threshold: Double) extends ProcessFunction[SensorReading, SensorReading]{
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature > threshold){
out.collect(value)
}else{
ctx.output(new OutputTag[(String, Long, Double)]("low"), (value.id, value.timestamp, value.temperature))
}
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-7430.html