导图社区 flink窗口相关
flink窗口的相关组成组件,使用技术实现细节等等分享。
编辑于2020-11-20 15:27:55flink
stream窗口
注意遇到问题
多次使用时间窗口时,只需要调用一次assignTimestampsAndWatermarks,多次调用,导致后面的时间窗口不输出
按每秒统计时,使用eventtime,可能出现乱序太厉害导致统计数据丢失,没有必要,还是使用processtime 或者严格把控写入kafka的eventtime的顺序
每个event都会被复制保存到它应该属于的窗口中,比如滑动窗口一条event可能属于多个window,那么就会有多个event的副本存在,当每秒统计一天的数据时,会造成性能问题.
什么是窗口?
流相当于一条永不停机的生产流水线 窗口则是一个闸门,拦住囤积生产的商品,定时或者囤积到一定数量释放一次.
窗口生命周期
创建
第一个属于该窗口的时间出现
结束
第一个超过该窗口时间范围(包括延迟时间)的时间出现
存活时间
窗口的范围,比如统计5分钟 + 允许的延迟时间,比如2分钟=7分钟
keyby和nokeyby
按key分组,相同key发送到相同task中,可以并行处理
未按key分组,所用数据发送到同一个,无法并行处理
窗口组成
assigner
数据发送到哪个窗口
内置的时间窗口assigner
tumbling windows
sliding windows
session windows
global windows
使用时,需要自定义trigger,不然不会触发释放内容
windowfunction
窗口方法. 当窗口中的数据积攒到需要触发时,怎么处理数据,比如计数,求和等,也就是聚合函数.
聚合函数,来一条和之前的数据累加计算,实时处理
常用方法
ReduceFunction
合并两条输入数据
输入和输出格式相同
AggregateFunction
reducefunction的一般式
更灵活,可自定义聚合中间过程,和不一样的输出结果
FoldFunction
不可以在session window中使用,以及其他mergeable windows
给定一个初始变量,将后续的每一个数据与初始值累加作为下一次的初始值
比如: a,b,c,d 四条数据,和 "word"初始值, 运行的效果: "word"+a+b+c+d
可增量式处理,来一条和之前的数据累加计算,实时处理
ProcessWindowFunction
窗口中的所有数据缓冲起来,最后一次性进行计算
processWindowFunction和其他方法联合使用
其他方法时增量的,当最后执行完成后,通过proccessFuncton做收尾工作,比如 keyby的时候将key和数据分开了,通过这种方法将key结合到消息中 其他方法执行完后,将时间信息合并到消息中,用于后续的处理
WindowFunction
旧时代的遗留产物,类似于processFunction
相比于processFunction,context是有限的,而且没有新的功能,比如处理per-window state功能
窗口局部状态(per-window state)
比如keyby后进行窗口操作,那么每个key都会分配一个窗口而不是所有key共用一个窗口,keyby后类似于多线程处理,窗口局部状态则为ThreadLocal变量
trigger
窗口什么时候触发发送
接口方法
onElement()
每来到一条event就调用一次
onEventTime()
注册的event定时器触发了,就调用
onProcessingTime()
注册的process定时器触发,就调用
onMerge()
当两个有状态的窗口合并时,合并两个窗口的状态,比如sessionwindow
clear()
删除窗口时,执行一些清理工作
通过返回结果TriggerResult通知flink窗口应该处于什么状态
CONTINUE
窗口继续工作,啥也不用处理
FIRE
触发窗口的计算,比如开始调用窗口方法,只触发不会清理数据
调用processfunction,将所有窗口数据进行处理
调用增量式处理方法时,直接返回之前计算的最后一条结果
PURGE
清除窗口中的内容
只会清理窗口中的event,但是不会清理meta-information(窗口的相关信息)和trigger的state等
FIRE_AND_PURGE
触发计算并清理窗口
这些方法都可以用来注册event或process定时器
默认trigger
EventTimeTrigger
eventtime窗口的trigger
ProcessingTimeTrigger
CountTrigger
PurgingTrigger
将其他trigger作为参数进行转化
NeverTrigger
做为globalWindow的trigger,永远不会fire,所以使用globalwindow时,需要自定义trigger
自定义trigger
自定义trigger会覆盖默认windowAssigner的trigger,这样可以自定义窗口,比如合并时间窗口和计数窗口
evictor
trigger触发后或者窗口方法执行前后,数据怎么处理,是留着,还是清理
接口方法
evictBefore()
窗口方法执行前执行,参数包含执行前的窗口中的数据
evictAfter()
窗口方法执行之后,窗口中的数据怎么处理
处理的是窗口中的数据,而不是窗口函数的输出数据 比如: 窗口中有8条数据,processfunction输出1条数据,此接口处理的都是8条数据 也就是evictBefore和evictAfter的参数都是8条数据,包括size参数也为8
自带实现
默认的实现都是在windowfunciton 执行之前插入逻辑
CountEvictor
只保留窗口中固定大小的数量的数据
DeltaEvictor
TimeEvictor
将超过最后的时间-间隔的数据清除,也就是旧的数据
注意事项
evictor在窗口方法执行之前执行时,即使时增量处理的窗口方法(比如AggregateFunction),每条消息也要经过evictor过滤
flink不保证窗口内的数据的顺序,这意味着,evictor移除window中的第一条数据,但是那条数据不一定是最先到达窗口中的
allowedLateness
使用globalWindow时,不会有数据是迟到的,因为globalWindow的结束时间是Integer.MAX_VALUE
通过调用allowedLateness方法,实现允许数据延迟到达窗口
side output
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag); 首先调用sideOutLateData设置标签 然乎通过getSideOutput方法获取迟到数据流
迟到被丢弃的数据可以通过此方法捕捉获取到
late firings
When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes the end of the window. In these cases, when a late but not dropped element arrives, it could trigger another firing for the window. These firings are called late firings, as they are triggered by late events and in contrast to the main firing which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows, as they may “bridge” the gap between two pre-existing, unmerged windows. Attention You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.