[flink] 使用ProcessFunction实现数据分流、格式转换

    科技2022-07-14  123

    /** 需求: 在process算子中定义一个ProcessFunction,使大于输入参数的数据输出到主流,不大于的数据输出到侧输出流 */ // ProcessFunction的泛型为<I,O>,O为主流的数据类型 class SplitTempProcessor(threshold: Double) extends ProcessFunction[SensorReading, SensorReading]{ override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { if (value.temperature > threshold){ // 若温度超过阈值,输出到主流 out.collect(value) }else{ // 若温度不超过阈值,输出到侧输出流 ctx.output(new OutputTag[(String, Long, Double)]("low"), (value.id, value.timestamp, value.temperature)) } } }
    Processed: 0.014, SQL: 8