Flink根据上游数据集是否为KeyedStream类型,即是否使用keyBy(...), 分为Keyed Window和Non-Keyed Window
上游数据集如果是KeyedStream类型,即使用了keyBy(...),则调用DataStreamAPI的window()方法,数据会根据Key在不同的Task实例中并行分别计算,最后得出针对每个Key统计的结果。
程序调用流程如下:
stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"上游数据集如果非KeyedStream类型,即没有使用keyBy(...),则调用WindowsAll()方法,所有的数据都会在窗口算子中由到一个Task中计算,并得到全局统计结果。
程序调用流程如下:
stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
基于业务数据的方面考虑,Flink支持四种窗口类型,由Fink WindowAssigner负责定义
滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠。 这种类型的窗口的最大特点是比较简单,只需要指定一个窗口长度(window size)。
如下代码,是对滚动窗口的使用
DataStream<T> input = ...; // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础之上增加了窗口滑动时间(SlideTime),且允许窗口数据发生重叠。当Windows size固定之后,窗口并不像滚动窗口按照Windows Size向前移动,而是根据设定的SlideTime向前滑动。
窗口之间的数据重叠大小根据Windows size和Slidetime决定,当Slide time小于Windows size便会发生窗口重叠,Slide size大于Windows size就会出现窗口不连续,数据可能不能在任何一个窗口内计算,Slidesize和Windowssize相等时,SlidingWindows其实就是TumblingWindows。
如下代码,是对滑动窗口的使用
DataStream<T> input = ...; // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算, 窗口的触发的条件是Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows不需要有固定 windows size 和 slide time,只需要定义Session Gap,来规定不活跃数据的时间上限即可。
如下代码,是对会话窗口的使用
DataStream<T> input = ...; // event-time session windows with static gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // event-time session windows with dynamic gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); // processing-time session windows with static gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // processing-time session windows with dynamic gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>);
全局窗口会将相同的键的元素分配给同一个窗口。全局窗口因为没有一个自然的结束点,因此必须指定自定义触发器,否则,不会执行任何计算。
如下代码,是对全局窗口的使用
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
Flink中提供了四种类型的Window Function,分别为ReduceFunction、AggregateFunction 以及 ProcessWindowFunction,(sum 、max、min)等。
前三种类型的 Window Fucntion 按照计算原理的不同可以分为两大类:
(1)增量聚合函数:对应有 ReduceFunction、AggregateFunction; 增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。 (2)全量窗口函数:对应有 ProcessWindowFunction、 WindowFunction 全量窗口函数使用的代价相对较高,性能比较弱, 主要因为此时算子需要对所有属于该窗口的接入数据进行缓存, 然后等到窗口触发的时候,对所有的原始数据进行汇总计算。
(1)说明
ReduceFunction定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,然后输出类型相同的一个结果元素。即ReduceFunction的输入类型和输出类型必须相同
(2)案例
输入如下键值对,按key对value求和
[root@localhost ~]# nc -lk 8888 c 1 d 2 c 1 c 2 d 3完整代码如下:
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //按key求和 DataStreamSource<String> dataStream = env.socketTextStream("122.114.85.238",8888); dataStream .map(new MyMapFunction()) .keyBy(0) //滚动窗口,计算每个5秒的输入元素的和 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //按key累加 .reduce((v1,v2) -> Tuple2.of(v1.f0, Long.valueOf(v1.f1) +Long.valueOf(v2.f1))) .print(); env.execute(); } } /** * 转换输入文本为二元组 * */ class MyMapFunction implements MapFunction<String, Tuple2<String, Long>> { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> map(String value) throws Exception { String[] split = value.split(" "); return Tuple2.of(split[0], Long.valueOf(split[1])); } }输出
2> (c,4) 3> (d,5)(1)说明
和ReduceFunction相似,AggregateFunction也是基于中间状态计算结果的增量计算函数,但AggregateFunction在窗口计算上更加通用。ReduceFunction的输入类型和输出类型可以相同。AggregateFunction接口相对ReduceFunction更加灵活,实现复杂度也相对较高。
AggregateFunction接口中定义了四个个需要复写的方法,createAccumulator()创建一个累加器,add()定义数据的添加逻辑,getResult定义了根据accumulator计算结果的逻辑,merge方法定义合并accumulator的逻辑。
(2)案例
输入如下键值对,按key对value求和
[root@localhost ~]# nc -lk 8888 c 1 d 2 c 1 c 2 d 3完整代码如下:
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //按key求和 DataStreamSource<String> dataStream = env.socketTextStream("122.114.85.238",8888); dataStream .map(new MyMapFunction()) .keyBy(0) //滚动窗口,计算每个5秒的输入元素的和 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //MyAggregateFunction求和,OutResultWindowFunction输出MyAggregateFunction结果 .aggregate(new MyAggregateFunction(),new OutResultWindowFunction()) .print(); env.execute(); } } /** * 转换输入文本为二元组 * */ class MyMapFunction implements MapFunction<String, Tuple2<String, Long>> { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> map(String value) throws Exception { String[] split = value.split(" "); return Tuple2.of(split[0], Long.valueOf(split[1])); } } /** *聚合累加 */ class MyAggregateFunction implements AggregateFunction<Tuple2<String,Long>, Long, Long>{ private static final long serialVersionUID = -4705678584168230654L; @Override public Long createAccumulator() { return 0L; } @Override public Long add(Tuple2<String, Long> value, Long accumulator) { return accumulator + value.f1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a+b; } } /** * 输出结果 */ class OutResultWindowFunction implements WindowFunction<Long,Tuple2<String,Long>,Tuple,TimeWindow> { private static final long serialVersionUID = -153402576910925970L; @Override public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<Tuple2<String, Long>> out) throws Exception { out.collect(Tuple2.of(key.getField(0), input.iterator().next())); } }输出:
2> (c,4) 3> (d,5)
(1)说明
前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素, 或需要操作窗口中的状态数据和窗口元数据, 这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction能够更加灵活地支持基于窗口全部数据元素的结果计算,例如对整个窗口数据排序取 TopN,这样的需要就必须使用ProcessWindowFunction。
(2)案例
输入成绩如下,对每次输入的成绩,按班级对每个人的成绩倒序排序
[root@localhost ~]# nc -lk 8888 二班 wm 100 一班 lisi 85 一班 ww 80 一班 zs 76 一班 ml 90 二班 zs 76 二班 ml 91完整代码如下:
import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class TestFlinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //按key求和 DataStreamSource<String> dataStream = env.socketTextStream("122.114.85.238",8888); dataStream .map(new MyMapFunction()) .keyBy(0) //滚动窗口,计算每个5秒的输入元素的和 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //自定义实现排序逻辑 .process(new MyProcessWindowFunction()) .print(); env.execute(); } } class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>, List<Tuple3<String,String,Long>>, Tuple, TimeWindow> { private static final long serialVersionUID = 1L; @Override public void process( Tuple key, ProcessWindowFunction<Tuple3<String,String, Long>, List<Tuple3<String,String,Long>>, Tuple, TimeWindow>.Context context, Iterable<Tuple3<String,String, Long>> it, Collector<List<Tuple3<String,String,Long>>> out) throws Exception { List<Tuple3<String, String,Long>> list = new ArrayList<>(); Iterator<Tuple3<String,String, Long>> iterator = it.iterator(); while(iterator.hasNext()) { Tuple3<String, String,Long> tuple2 = iterator.next(); list.add(tuple2); } Collections.sort(list, new Comparator<Tuple3<String, String,Long>>() { @Override public int compare(Tuple3<String, String,Long> o1, Tuple3<String,String, Long> o2) { //降序,o1,o2反过来则升序 return o2.f2.compareTo(o1.f2); } }); out.collect(list); } } /** * 转换输入文本为二元组 * */ class MyMapFunction implements MapFunction<String, Tuple3<String,String, Long>> { private static final long serialVersionUID = 1L; @Override public Tuple3<String, String, Long> map(String value) throws Exception { String[] split = value.split(" "); return Tuple3.of(split[0], split[1],Long.valueOf(split[2])); } }输出:
2> [(二班,wm,100), (二班,ml,91), (二班,zs,76)] 4> [(一班,ml,90), (一班,lisi,85), (一班,ww,80), (一班,zs,76)]参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/operators/windows.html
