Flink认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从Streaming到Batch的一个桥梁,Flink提供了非常完善的窗口的机制,这是我认为的Flink最大亮点之一(其他的亮点包括消息乱序处理和checkpoint机制)。本文介绍流处理中窗口概念。
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
窗口可以是时间驱动(Time Window,例如:每30秒钟),也可以是数据驱动(Count Window,例如:每一百个元素)。一种经典的窗口分类可以分成:翻滚窗口(Tumbing Window,无重叠),滚动窗口(Sliding Window,有重叠),会话窗口(Session Window,活动间隙)。
举了具体场景来形象地理解不同窗口概念,假设,淘宝网会记录每个用户每次购买的商品个数,我们要做的是统计不同窗口中用户购买商品的总数。
图片来源于网络上图中,raw data stream代表用户的购买行为流,图中数字代表该用户本次购买商品个数,事件是按时间分布的,所以可以看出事件之间是有time gap的。
根据时间对数据流进行分组的。这里我们涉及到流处理的时间问题,时间问题和消息乱序问题紧密相连,这是流处理中现存的难题之一,后续会对消息乱序处理的深入探讨,在本文中需要知道Flink提了三种时间概念,分别是event time(事件时间:事件发生的时间),ingestion time(摄取时间:事件进入流处理系统时间),processing time(处理时间:消息被计算机处理的时间)。Flink中窗口机制和时间类型是完全解耦的,也就是说当需要改变时间类型时不需要更改窗口逻辑相关的代码。
Tumbling Time Window
如上图,我们需要统计每一分钟中用户购买商品总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。通过使用 DataStream API,我们可以这样实现:
// Stream of (userId, buyCnt) val buyCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by userId .keyBy(0) // tumbling time window of 1 minute length .timeWindow(Time.minutes(1)) // compute sum over buyCnt .sum(1)Sliding Time Window
但是对于某些应用,它们需要的窗口是不间断的,需要平滑地进行进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买商品总数。这种窗口我们称滑动时间窗口(Sliding Time Window)。在滑动窗口中,一个元素可以对应多个窗口。
val slidingCnts: DataStream[(Int, Int)] = buyCnts .keyBy(0) // sliding time window of 1 minute length and 30 secs trigger interval .timeWindow(Time.minutes(1), Time.seconds(30)) .sum(1)Count Window是根据元素个数对数据流进行分组的。
Tumbling Count Window
当我们想要每个100用户购买行为事件统计购买总数,那么每当窗口填满100个元素,就会窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小3个。
// Stream of (userId, buyCnts) val buyCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by sensorId .keyBy(0) // tumbling count window of 100 elements size .countWindow(100) // compute the buyCnt sum .sum(1)Sliding Count Window
当然Count Window也支持Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding count window of 100 elements size and 10 elements trigger interval .countWindow(100, 10) .sum(1)在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。Session Window 的示例代码如下:
// Stream of (userId, buyCnts) val buyCnts: DataStream[(Int, Int)] = ... val sessionCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // session window based on a 30 seconds session gap interval .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) .sum(1)一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。下面我们会对 Flink 窗口相关的 API 进行剖析。
得益于 Flink Window API 松耦合设计,我们可以非常灵活地定义符合特定业务的窗口。Flink 中定义一个窗口主要需要以下三个组件。
Window Assigner
该组件主要功能是决定数据该分发到哪个窗口,它的作用可以类比于SpringMVC 中Dispatcher。
上图左侧是 Flink 窗口实现的包结构,三大组件在对应的目录下,清晰明了。
底部是 Window Assigner 的继承类,在调用 WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) 这类方法时,可以传入上述的窗口分发器,在里面实现自定义的窗口分发逻辑。
Trigger
每个窗口都有一个触发器,该触发器决定何时评估或清除该窗口。
对于每个插入到窗口中的元素以及先前注册计时器超时时,该触发该触发器。
上图展示了触发器的继承类,从中可以看出,它可以根据时间或者计数来触发,表示这个窗口的数据已收集完成,可以触发这个计算逻辑。
Evictor
直译为 ‘驱逐者’,作用类似于过滤器 fliter,在 trigger 后执行,如果设定了 evictor,将会去除不符合条件的数据(默认是不设定的,不会驱逐)。
通过这三大组件,可以实现自定义窗口逻辑,决定数据如何分配、何时触发计算以及哪些数据要被提前去除,详细使用例子可以参考官网的示例:https://flink.apache.org/news/2015/12/04/Introducing-windows.html
下图描述Flink窗口机制以及各个组件之间如何相互工作。
图片来源官网首先上图中组件都位于一个算子(window operator)中,数据流源源不断地进入算子,每一个到达的元素都会交给WindowAssigner。WindowAssigner会决定元素被放到哪个或哪些窗口(window),可能会创建新窗口。因为一个元素可以被放入多个窗口中,所以同时存在多个窗口的可能。
注意
Window本身只是一个ID标识符,其内部可能存储了一些元数据,如TimeWindow中有开始和结束时间,但是并不会存储窗口的元素。
窗口元素实际存储在Key/Value State中,Key为Window,Value为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖Flink的State机制。
Trigger
每一个窗口都拥有一个属于自己的Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入该窗口,或者之前注册定时器超时了,那么Trigger都被调用。Trigger的返回结果可以是continue(不做任何操作),fire(处理窗口数据)
,purge(移除窗口和窗口中数据),或者fire+purge。
一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
Evictor
当Trigger fire了,窗口中元素集合就会交给Evictor(如果指定)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给函数进行计算。
计算函数收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口的结果值可以是一个也可以是多个。DataStream API 上可以接收不同类型的计算函数,包括预定义的sum(),min(),max(),还有 ReduceFunction,FoldFunction,还有WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。
优化
Flink 对于一些聚合类的窗口计算(如sum,min)做了优化,因为聚合类的计算不需要将窗口中的所有数据都保存下来,只需要保存一个result值就可以了。每个进入窗口的元素都会执行一次聚合函数并修改result值。这样可以大大降低内存的消耗并提升性能。但是如果用户定义了 Evictor,则不会启用对聚合窗口的优化,因为 Evictor 需要遍历窗口中的所有元素,必须要将窗口中所有元素都存下来。
传参
用nc模拟下传输数据,可以看到滑动窗口统计单词计数。
参考
http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/
公众号
名称:大数据计算 微信号:bigdata_limeng