1、Map [DataStream->DataStream]
(1)说明
调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的Data-Stream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。例如将输入数据集中的每个数值全部加 1 处理,并且将数据输出到下游数据集
2、FlatMap [DataStream->DataStream]
(1)说明
该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景, 比较常见的是在经典例子 WordCount 中,将每一行的文本数据切割,生成单词序列。如下图所示,对于输入DataStream[String]通过 FlatMap 函数进行处理, 字符串数字按逗号切割, 然后形成新的整数数据集。
3、Filter [DataStream->DataStream]
(1)说明
该算子将按照条件对输入数据集进行筛选操作, 将符合条件的数据集输出, 将不符合条件的数据过滤掉。如下图所示将输入的奇数从数据集中过滤掉
4、KeyBy [DataStream->KeyedStream]
(1)说明
该算子根据指定的Key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集中执行Partition操作,将相同的Key值的数据放置在相同的分区中。如下图所示, 将集合中颜色相同的方块按照key分方在同一区域
(2)举例
上图中实现如下:
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> stream = env.fromElements( Tuple2.of(1L, 1L), Tuple2.of(1L, 5L), Tuple2.of(1L, 0L), Tuple2.of(2L, 2L), Tuple2.of(2L, 3L) ); stream.keyBy(0).print(); env.execute(); } }输出
3> (1,1) 3> (1,5) 3> (1,0) 4> (2,2) 4> (2,3)5、Reduce [KeyedStream->DataStream]
(1)说明
该算子和MapReduce中Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用户自定义的ReduceFunction滚动地进行数据聚合处理,其中定义的ReduceFunciton必须满足运算结合律和交换律。如下代码对传入keyedStream数据集中相同的key值的数据独立进行求和运算,得到每个key所对应的求和值。
(2)举例
例如,按key分组对value进行累加
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> textDataSteam = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(2L, 2L) ); SingleOutputStreamOperator<Tuple2<Long, Long>> dataSteam = textDataSteam .flatMap(new MyFlatMapFunction()) .keyBy(0).reduce(new MyReduceMapFunction()); dataSteam.print(); env.execute(); } } class MyFlatMapFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = -6478853684295335571L; @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(Tuple2.of(value.f0, value.f1)); } } class MyReduceMapFunction implements ReduceFunction<Tuple2<Long, Long>> { private static final long serialVersionUID = -6478853684295335571L; @Override public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }输出
3> (1,5) 3> (1,8) 3> (1,12) 3> (1,19) 4> (2,2)6、Aggregations[KeyedStream->DataStream]
(1)说明
Aggregations是KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。其实是将 Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、miniBy、max、maxBy等,这样就不需要用户自己定义 Reduce 函数。
(2)举例
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> stream = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 8L), Tuple2.of(1L, 5L), Tuple2.of(2L, 6L), Tuple2.of(2L, 7L) ); stream.keyBy(0).sum(1).print(); stream.keyBy(0).min(1).print("min"); stream.keyBy(0).minBy(1).print("minby"); stream.keyBy(0).max(1).print("max"); stream.keyBy(0).maxBy(1).print("maxby"); env.execute(); } }7、Union[DataStream ->DataStream]
(1)说明
Union 算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集的格式和输入的数据集格式保持一致。
(2) 举例
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> stream1 = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) ); DataStreamSource<Tuple2<Long,Long>> stream2 = env.fromElements( Tuple2.of(1L, 7L), Tuple2.of(1L, 4L) ); DataStreamSource<Tuple2<Long,Long>> stream3 = env.fromElements( Tuple2.of(2L, 2L) ); //合并一个或多个DataStream数据集 stream1.union(stream2,stream3).print(); env.execute(); } }输出
4> (1,3) 2> (1,4) 1> (1,5) 1> (1,7) 3> (2,2)8、Connect ,CoMap ,CoFlatMap[DataStream ->ConnectedStream->DataStream]
(1)说明
Connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来数据集的数据类型。例如:dataStream1数据集为(Long,Long)元祖类型,dataStream2数据集为Long类型,通过connect连接算子将两个不同数据类型的流结合在一起,形成格式为ConnectedStreams的数据集,其内部数据为[(Long,Long),Int]的混合数据类型,保留了两个原始数据集的数据类型。
(2)举例
将两个不同类型的流stream1与stream2连接在一起,为connectedStreams,再用CoMap或者CoFlatMap将连接的connectedStreams流转换成同一类型的流,这样就合并了两个不同类型的流为同一类型。
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> stream1 = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) ); DataStreamSource<Long> stream2 = env.fromElements( 2L,3L ); //将stream1与stream2连接在一起,为connectedStreams ConnectedStreams<Tuple2<Long, Long>, Long> connectedStreams = stream1.connect(stream2); //将连接的connectedStreams转换成同一类型的流 connectedStreams.flatMap(new MyCoFlatMapFunction()).print(); env.execute(); } } class MyCoFlatMapFunction implements CoFlatMapFunction<Tuple2<Long,Long>, Long, Tuple2<Long, Long>> { private static final long serialVersionUID = -6478853684295335571L; @Override public void flatMap1(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(value);//stream1 } @Override public void flatMap2(Long value, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(Tuple2.of(value, 0L));//stream2转换成/stream1类型 } }输出
4> (1,5) 3> (1,3) 3> (3,0) 2> (2,0)
9、Split 和 和 select [DataStream->SplitStream->DataStream]
(1)说明
Split算子是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程,也是union算子的逆向实现。每个接入的数据都会被路由到一个或者多个输出数据集中。
(2)举例
将二元组中key为1和2的拆分为两个流
import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long,Long>> stream = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(2L, 6L) ); SplitStream<Tuple2<Long, Long>> splitStream = stream.split(new OutputSelector<Tuple2<Long,Long>>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> select(Tuple2<Long, Long> value) { List<String> list = new ArrayList<>(); if(value.f0==1L) { list.add("1"); } else { list.add("2"); } return list; } }); splitStream.select("1").print("流1:"); splitStream.select("2").print("流2:"); env.execute(); } }输出
流2::1> (2,6) 流1::3> (1,3) 流1::4> (1,5)
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/operators/