
2.3 窗口
本节主要介绍窗口的原理及实现,帮助读者深入了解Flink中的窗口是怎么实现的。
2.3.1 窗口的基本概念
窗口是无边界流式系统中非常重要的概念,窗口把数据切分成一段段有限的数据集,然后进行计算。Flink中窗口按照是否并发执行,分为Keyed Window和Non-Keyed Window,它们的主要区别是有无keyBy动作。Keyed Window可以按照指定的分区方式并发执行,所有相同的键会被分配到相同的任务上执行。Non-Keyed Window会把所有数据放到一个任务上执行,并发度为1。我们来看下窗口的相关API。
1. Keyed Window
stream.keyBy(...) .window(...) // 接受WindowAssigner参数,用来分配窗口 [.trigger(...)] // 可选的,接受Trigger类型参数,用来触发窗口 [.evictor(...)] // 可选的,接受Evictor类型参数,用来驱逐窗口中的数据 [.allowedLateness(...)] // 可选的,接受Time类型参数,表示窗口允许的最大延迟,超过该延迟,数据会被丢弃 [.sideOutputLateData(...)] // 可选的,接受OutputTag类型参数,用来定义抛弃数据的输出 .reduce/aggregate/fold/apply() // 窗口函数 [.getSideOutput(...)] // 可选的,获取指定的DataStream
2. Non-Keyed Window
stream.windowAll(...) // 接受WindowAssigner参数,用来分配窗口 [.trigger(...)] // 可选的,接受Trigger类型参数,用来触发窗口 [.evictor(...)] // 可选的,接受Evictor类型参数,用来驱逐窗口中的数据 [.allowedLateness(...)] // 可选的,接受Time类型参数,表示窗口允许的最大延迟,超过该延迟,数据会被丢弃 [.sideOutputLateData(...)] // 可选的,接受OutputTag类型参数,用来定义抛弃数据的输出 .reduce/aggregate/fold/apply() // 窗口函数 [.getSideOutput(...)] // 可选的,获取指定的DataStream
因为实际生产中我们大多会使用Keyed Window,所以后续章节的解读都是针对Keyed Window展开的。我们来看下上面提到的几个主要概念。
- WindowAssigner:窗口分配器。我们常说的滚动窗口、滑动窗口、会话窗口等就是由WindowAssigner决定的,比如TumblingEventTimeWindows可以产生基于事件时间的滚动窗口。
- Trigger:触发器。Flink根据WindowAssigner把数据分配到不同的窗口,还需要一个执行时机,Trigger就是用来判断执行时机的。Trigger类中定义了一些返回值类型,根据返回值类型来决定是否触发及触发什么动作。
- Evictor:驱逐器。在窗口触发之后,在调用窗口函数之前或者之后,Flink允许我们定制要处理的数据集合,Evictor就是用来驱逐或者过滤不需要的数据集的。
- Allowed Lateness:最大允许延迟。主要用在基于事件时间的窗口,表示在水位线到达之后的最长允许数据延迟时间。在最长允许延迟时间内,窗口都不会销毁。
- Window Function:窗口函数。用户代码执行函数,用来做真正的业务计算。
- Side Output:丢弃数据的集合。通过getSideOutput方法可以获取丢弃数据的DataStream,方便用户进行扩展。
以上就是窗口的一些主要概念,接下来我们深入分析窗口的每个元素。
2.3.2 窗口的执行流程
在深入介绍窗口之前,我们先从整体上看下窗口的执行过程,以便有个全局的概念。本节从整体上介绍窗口的执行流程,如果其中有细节不清楚的地方,可以绕过本节,直接看后面几节,再回过头来看本节内容。
窗口本质上也是一个算子,所以我们直接来看其实现类:EvictingWindowOperator和WindowOperator。这两个类的区别是前者带驱逐器,后者不带。为了覆盖更多的场景,我们用EvictingWindowOperator来分析。
我们直接从算子最重要的方法processElement开始。如图2-4所示,整个过程从分配窗口(WindowAssigner的主要作用)开始,分配好窗口后,用当前窗口来设置窗口状态的命名空间;之后把当前数据加入状态中(如果是聚合函数的话,还会有计算过程),并用当前数据去判断触发器是否触发,如果触发,那就调用emitWindowContents方法处理数据,该方法的主要过程是调用驱逐器清除数据;然后调用窗口函数计算结果;最后注册一个窗口本身的清除定时器。

图2-4 窗口算子执行流程
主要源代码如下:
public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); W actualWindow = mergingWindows.addWindow(...) } evictingWindowState.setCurrentNamespace(stateWindow); evictingWindowState.add(element); TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents, evictingWindowState); } registerCleanupTimer(window); }
2.3.3 窗口分配器
本节主要介绍Flink中窗口分配器的作用及几种典型实现,这几种典型的实现实际上对应着几种典型的窗口。
熟悉流计算的读者可能知道,窗口(时间窗口)大致可以分为滑动窗口和滚动窗口。那么这个分类是由什么决定的呢?显然它是由数据分配到不同窗口的方式决定的。在Flink中,这个分配的动作就是由窗口分配器完成的。不同的窗口分配器实现类对应不同的窗口。
窗口分配器的接口定义如下:
public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context); public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); public abstract boolean isEventTime(); public abstract static class WindowAssignerContext { /** * 返回当前的处理时间 */ public abstract long getCurrentProcessingTime(); } }
其中最关键的是assignWindows方法,它用来分配窗口。我们来看几种常用的实现。
1. 滚动窗口
Flink中有TumblingEventTimeWindows和TumblingProcessingTimeWindows两种滚动窗口(Tumbling Window),分别对应基于事件时间的滚动窗口和基于系统时间的滚动窗口。这两种实现分配数据的策略实际上是一样的,只是基于的时间不同。我们来看下TumblingEventTimeWindows的assignWindows方法:
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // 计算窗口开始的时间 long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', " + "or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
可以看到,其实现还是比较清楚的,根据窗口的大小(size)、偏移量(offset)、数据时间计算窗口的开始时间。具体的计算方法如下:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
返回一个TimeWindow。
2. 滑动窗口
和滚动窗口一样,滑动窗口(Sliding Window)也有SlidingEventTimeWindows和Sliding-ProcessingTimeWindows两种实现,两种实现也基本是一样的。我们来看SlidingProcessing-TimeWindows的assignWindows方法:
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; }
首先我们看到一个最明显的区别是返回的TimeWindow个数不同,滚动窗口只返回一个,而滑动窗口返回多个,这也符合我们对滑动窗口的理解:滑动窗口是可以重叠的,一个数据可以落入多个窗口内(可以思考一下一个数据最多可以落入几个窗口内)。与滚动窗口一样,计算最后一个窗口的开始时间,然后不断回溯(前一个窗口的开始时间减去滑动时间)寻找位于时间范围内的窗口,直到窗口的结束时间早于系统时间(或者事件时间)。
3. 会话窗口
会话窗口(Session Window)是Flink中比较独特的窗口类型,其他流式系统不支持它,或支持得不够好。会话窗口可以按照一个会话来分配数据,而会话的长度可以是固定的(EventTimeSessionWindows、ProcessingTimeSessionWindows),也可以是不断变化的(DynamicProcessingTimeSessionWindows、DynamicEventTimeSessionWindows)。使用过会话的读者可能知道,只要不过期会话就可以一直存在,新的数据必然会加入某个会话,同时会导致会话的超时时间发生改变。在Flink中,会话的不断变化就对应着会话窗口的不断合并。我们以EventTimeSessionWindows为例来看下会话窗口的实现,其中比较复杂的是窗口的合并。
会话窗口中数据的分配和滚动窗口很像,即返回一个计算好的窗口(TimeWindow)。
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); }
窗口的分配过程结束后,会得到一个窗口。这个新分配的窗口属于哪个会话(真正的窗口)呢?我们来看图2-5中的例子(例子中sessionTimeout=3)。

图2-5 会话窗口
假如Flink接收到数据时间为1的数据(图2-5中的步骤1)(这里我们假设键相同或者是Non-Keyed Window),那么这个时候会生成TimeWindow(1,4),并处理数据时间为5的数据,生成TimeWindow(5,8);然后继续处理时间为3的数据,这个时候应该生成TimeWindow(3,6)的窗口,但是由于TimeWindow(1,4)对应的会话还没有过期,应该把时间为3的数据归到这个会话中,所以Flink中进行TimeWindow的合并。同理,当TimeWindow(1,4)和TimeWindow(3,6)合并为TimeWindow(1,6)的时候,也应该将TimeWindow(5,8)同自己合并,这样最后合并为TimeWindow(1,8)。当然不只是将TimeWindow合并,还需要将窗口对应的触发器、数据合并。我们来看合并的关键代码,合并发生在数据被WindowOperator处理的过程中:
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() << mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime()) { long currentProcessingTime = internalTimerService.currentProcessingTime(); if (mergeResult.maxTimestamp() <= currentProcessingTime) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than " + "the current processing time " + "by merging. Current processing time: " + currentProcessingTime + " window: " + mergeResult); } } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // 合并状态 windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } });
其中关键的方法是MergingWindowSet的addWindow方法,其中TimeWindow合并的细节在其mergeWindows方法中,合并的规则就是我们上面介绍的。
合并的主要过程如下:
1)找出合并之前的窗口集合和合并之后的窗口;
2)找出合并之后的窗口对应的状态窗口(方式是从合并窗口集合中挑选第一个窗口的状态窗口);
3)执行merge方法(合并窗口需要做的工作,也就是执行MergingWindowSet的addWindow方法)。
这里不好理解的是合并结果的窗口和结果对应的状态窗口(用来获取合并之后的数据),我们来看图2-6。

图2-6 合并窗口
MergingWindowSet(窗口合并的工具类)中有个map,用来保存窗口和状态窗口的对应关系,那么怎么理解这个状态窗口呢?如果我们在得到TimeWindow(1,4)时基于TimeWindow(1,4)在状态中保存了数据(数据A),也就是说状态的命名空间是TimeWindow(1,4),在得到TimeWindow(5,8)时基于TimeWindow(5,8)在状态中保存了数据(数据B),当第三个数据(数据C)来的时候,又经过合并窗口得到了TimeWindow(1,8),那么怎么获取合并窗口的数据集AB呢?显然我们还需要原来的TimeWindow(1,4)或者TimeWindow(5,8),原来的TimeWindow(1,4)在这里就是状态窗口。
这里窗口合并的同时会把窗口对应的状态所保存的数据合并到结果窗口对应的状态窗口对应的状态中。这里有点绕,还是看图2-6,最终合并窗口的结果窗口是TimeWindow(1,8)。我们怎么获取TimeWindow(1,8)对应的数据集ABC呢?这个时候可以通过MergingWindowSet中保存的TimeWindow(1,8)对应的状态窗口TimeWindow(1,4)来获取合并后的状态,即数据集ABC。
会话窗口的其他过程与滑动窗口及滚动窗口没有什么区别。
4. 全局窗口
全局窗口(Global Window),顾名思义就是所有的元素都分配到同一个窗口中,我们常用的Count Window就是一种全局窗口。其实现GlobalWindow的主要方法如下:
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(GlobalWindow.get()); }
这里需要说明的是全局窗口和Non-Keyed Window是完全不同的概念:Non-Keyed Window是指并发为1的窗口,可以是滚动窗口或者滑动窗口;而全局窗口既可以是Non-Keyed Window,也可以是Keyed Window。
2.3.4 触发器
本节主要介绍窗口中触发器的作用以及几种典型触发器的实现。
触发器决定窗口函数什么时候执行以及执行的状态。触发器通过返回值来决定什么时候执行,其返回值有如下几种类型。
- CONTINUE:什么也不做。
- FIRE:触发窗口的计算。
- PURGE:清除窗口中的数据。
- FIRE_AND_PURGE:触发计算并清除数据。
其接口定义如下(列出主要方法):
public abstract class Trigger<T, W extends Window> implements Serializable { //每个增加到窗口中的数据都需要调用该方法,根据返回结果判定窗口是否触发 public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; //当注册的系统时间定时器到期后调用,其调用是通过WindowOperator中的triggerContext进行的 public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; //当注册的事件时间定时器到期后调用,其调用是通过WindowOperator中的triggerContext进行的 public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; //主要用在sessionWindow,窗口合并的时候调用 public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } }
Flink实现了几种常用的触发器。
- EventTimeTrigger:当水位线大于窗口的结束时间时触发,一般用在事件时间的语义下。
- ProcessingTimeTrigger:当系统时间大于窗口结束时间时触发,一般用在系统时间的语义下。
- CountTrigger:当窗口中的数据量大于一定值时触发。
- DeltaTrigger:根据阈值函数计算出的阈值来判断窗口是否触发。
其中经常会用到的是根据系统时间和事件来判断窗口是否触发的触发器,我们来看下其实现过程。
我们先来看ProcessingTimeTrigger是怎么实现的。
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; }
在onElement方法中,调用triggerContext注册了窗户最大时间的定时器,tiggerContext中调用InternalTimerService来进行定时器注册。InternalTimerService是Flink内部定时器的存储管理类。整个调用及实现过程如图2-7所示。

图2-7 ProcessingTimeTrigger
InternalTimerServiceImpl内部维护了一个有序的队列,用来存储定时器(TimerHeap-InternalTimer),并且利用ProcessingTimeService来延迟调度基于系统时间生成的Trigger-Task。TriggerTask会调用InternalTimerServiceImpl的onProcessingTime方法,onProcessing-Time会调用真正的目标(WindowOperator)onProcessingTime方法,完成一次定时器的触发。在InternalTimerServiceImpl调用onProcessingTime方法的过程中,会重设上下文(Context)的键,确保后续操作都是针对当前键对应的数据。
那么EventTimeTrigger和ProcessingTimeTrigger在实现上有什么不一样呢?
首先我们知道,基于事件时间的触发器必然与事件时间有关。而事件时间不是有序的,不能像系统时间那样,用延迟任务来触发。那么什么时候触发基于事件时间的定时器呢?水位线(Watermark)在Flink中是用来推动基于事件时间的处理动作执行的,也就是说水位线代表了事件的最晚到达时间。我们就可以采用水位线来触发基于事件时间的定时器,事实上Flink也是如此实现的,我们来看代码:
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // 如果水位线经过窗口,那么就触发 return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }
以上代码是EventTimeTrigger的onElement方法,与ProcessingTimeTrigger一样,如果条件不满足,那就调用TriggerContext来注册一个事件时间定时器,这里的依据是水位线是否大于窗口最大时间。同样,TriggerContext会调用InternalTimerServiceImpl的registerEventTimeTimer来真正注册定时器,InternalTimerServiceImpl注册的动作也就是把定时器(TimerHeapInternalTimer)放到一个有序队列中(eventTimeTimersQueue),之后就等水位线来触发。
如图2-8所示,整个触发过程是通过StreamTask处理水位线来驱动的,经过一系列的调用,由InternalTimeServiceManager完成触发器的触发,触发条件是水位线大于定时器的时间。

图2-8 EventTimeTrigger
上面分析了EventTimeTrigger和ProcessingTimeTrigger的实现过程,其他触发器,如CountTrigger相对简单些,通过条件(数量是否大于阈值)就可以完成是否触发的判断,这里不再讨论。下一节介绍当窗口完成触发的时候,窗口函数怎么执行。
2.3.5 窗口函数
上一节分析了触发器,本节来看下窗口触发之后的计算过程,也就是窗口函数(Window Function)。
Flink中的窗口函数主要有ReduceFunction、AggregateFunction、ProcessWindow-Function三种(FoldFunction理论上可以通过AggregateFunction实现,并且Flink从1.8版本开始已经把该函数标记为Deprecated,因此该函数我们不再讨论)。在实际使用中推荐使用前两种,因为它们是增量计算,每条数据都会触发计算,而且窗口状态中只保留计算结果。而ProcessWindowFunction(或者使用了驱逐器之后)需要窗口把所有的数据保留下来,到窗口触发的时候,调用窗口函数计算,效率比较低,而且会造成大量状态缓存。下面我们详细看下前两种窗户函数的实现。
1. ReduceFunction
ReduceFunction的接口定义如下:
public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; }
ReduceFunction是一个输入、输出类型一样的简单聚合函数,可以用来实现max()、min()、sum()等聚合函数。在WindowOperator中并不直接使用ReduceFunction作为算子的userFunction,而要经过层层包装。主要包装类有两类。一类是WindowFunction,用来指导具体的窗口函数怎么计算。比如PassThroughWindowFunction,它表示不调用用户的窗口函数,直接输出结果,用来包装ReduceFunction和AggregateFunction,因为这两个函数在窗口触发的时候已经计算好了结果,只需要发送结果即可。另一类是InternalWindowFunction的实现类,主要用来封装窗口数据的类型,然后实际调用前面讲的第一类包装窗口类。这么讲有点抽象,我们具体来看ReduceFunction函数在Flink中是怎么调用的。
我们看在WindowStream中调用reduce方法之后会发生什么。
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException( "ReduceFunction of reduce can not be a RichFunction. " + "Please use reduce(ReduceFunction, WindowFunction) instead."); } // 清除闭包 function = input.getExecutionEnvironment().clean(function); return reduce(function, new PassThroughWindowFunction<K, W, T>()); }
接着调用重载的reduce方法(下面只列出关键代码):
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { operator = new WindowOperator<>( windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<>(function), trigger, allowedLateness, lateDataOutputTag); }
可以看到,最终传给WindowOperartor的function是一个new InternalSingleValue-WindowFunction (new PassThroughWindowFunction())的实例对象。PassThroughWindow-Function我们在前面讲过,该函数什么也不做,只是把输出发送出去。再看InternalSingle-ValueWindowFunction,它也是基本什么都不做(只是把单个input对象转为集合对象,这就是我们刚才说的,该类包装类用来把输入转换为合适的类型),只是调用刚才传入它内部的PassThroughWindowFunction,WindowOperator最终拿到的窗口函数就是把结果发送出去,不进行任何计算。
那么我们传入的ReduceFunction怎么起作用?什么时候调用呢?我们来看ReduceFunction传入WindowedStream之后用在了哪里,还是刚才的reduce方法:
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>( "window-contents", reduceFunction, input.getType().createSerializer(getExecutionEnvironment().getConfig()));
由这样一段代码可以看到,reduceFunction被放到了StateDescriptor中,用来生成我们需要的ReducingState,并且reduceFunction被传递给ReducingState,用来进行真正的计算。我们来看ReducingState的实现类RocksDBReducingState的add方法:
public void add(V value) throws Exception { byte[] key = getKeyBytes(); V oldValue = getInternal(key); // 这里reduceFunction函数被调用 V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value); updateInternal(key, newValue); }
这里可以再看下图2-4所示的窗口算子执行流程图,会更清晰易懂。
2. AggregateFunction
AggregateFunction是对ReduceFunction的扩展,可以接受三种类型的参数——输出、计算和输出,它的适用范围比ReduceFunction更广。其实现过程与ReduceFunction基本一致,这里不再赘述。
到这里窗口的主要概念和设计实现原理都介绍完了,大家如果有兴趣,可以根据本章的介绍去分别实现一种自己定制的窗口分配器、触发器及窗口函数。