我们以MapFunction为例,来分析下这些xxxFunction的规律
可以看到new MapFunction后面的
另外注意,这里在这里敲入new MapFunction的时候,必须在等号左侧已经书写了DataStream<Tuple2<String,Integer>>result
否则会导致intellij在MapFunction中动态补充的代码中出现
MapFunction<String, Object>()
而不是MapFunction<String, Tuple2<String, Integer>>()
完整代码如下:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.ReduceFunction; import java.io.File; public class WindowReduce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> dataStream = env.socketTextStream("Desktop", 9999); // map的作用是 //返回的是什么,输入的是什么 DataStream<Tuple2<String,Integer>>result=dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<String, Integer>(s,1) ; } }); } }
