• +86-156-1535-0639
  • jianpengqi@126.com

Storm Window中的数据填充方式以及顺序

  • QI-Jianpeng

Apache Storm 一个没有解释透彻的问题就是Sliding Window中的TupleWindow在面对数据流到达时是慢慢填充满整个窗口的, 不是一次性填充满然后进行相应处理的.

而Tumbling Window是对当前窗口汇总完后在处理. 也就是不存在窗口初始化的问题, 一直是一个被填充满窗口. 需要注意的是, 这种窗口是汇总来自于同一个component的不同task的数据.

TupleWindow中会提供三种方法, 分别为get(), getNew(), getExpired(). get()方法用来获取当前的一个窗口内的所有数据, 这个窗口里的数据排列方式是按照接收到event的顺序进行排序的, 因此对于一个tuple属于哪一个Slide, 这是需要额外的计算的(比如通过time/slideInterval计算).
而对于getNew()getExpired()方法来说, 这是严格的按照接收到的时间戳顺序来计算的.

在tuplewindow中的数据可能是乱序的

这里有一个例子, 比如设置windowLength = 6, interval = 2, lag = 6; 我在spout中除了发送流水时间的tuple外, 还同时发送在这个tuple的时间戳基础上+3的tuple, 可能就会得到以下数据, 其中[a, b, c] 中a代表生成的随机整数, b代表时间戳(由msgId而来), c代表msgId同时也代表真实的时间戳信息, 下面这个例子演示了TupleWindow填充数据, 以及过期数据的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
news tuple: [265, 1, 1]
tuple in window[265, 1, 1]
news tuple: [7, 4, 2]
news tuple: [298, 3, 3]
tuple in window[265, 1, 1]
tuple in window[7, 4, 2] # 注意这里人造时间戳是4, 真实的时间戳是2
tuple in window[298, 3, 3]
news tuple: [952, 6, 4]
news tuple: [862, 5, 5]
tuple in window[265, 1, 1]
tuple in window[7, 4, 2] # 在这个地方, 它的顺序仍然没有发生变化.
tuple in window[298, 3, 3]
tuple in window[952, 6, 4]
tuple in window[862, 5, 5]
news tuple: [334, 8, 6]
news tuple: [160, 7, 7]
tuple in window[7, 4, 2]
tuple in window[298, 3, 3]
tuple in window[952, 6, 4]
tuple in window[862, 5, 5]
tuple in window[334, 8, 6]
tuple in window[160, 7, 7]
expired tuple: [265, 1, 1]

这个例子可以说明, 当Spout数量不唯一时, 由于网络环境的不一致, 可能就会造成接受tuple乱序. 同时在tuplewindow里的数据顺序没有发生变化, 这说明存储数据的其实是一个普通的静态结构, 也就是说, 插入数据时它不会引起按照时间戳的顺序进行排序这一操作. 事实上,Storm使用的是ConcurrentLinkedQueue 来存储的. 而对于getExpired()这种方法, Storm采取的方式是通过扫描这个window queue来获取过期的数据. 所以不管你窗口内的数据乱不乱序, 反正我过滤一下, 都会把你过期的数据提取出来. 这是需要注意的地方.
(顺便提一下, 这个地方Storm是否可以进行一下改善? 既然我指定了时间戳信息, 那么到达数据的的排序应该是按照设定的时间顺序进行排序, 而不是按照接收到数据的添加时间进行排序.)

这个追溯源码的话也可以看一下WindowManager这个类, 在apache.storm.windowing包下.

当然, 对于TumblingWindow来说, 它是不单单是滚动窗口, 还是汇总窗口 getNew()getExpired()是无效的.

这里补充一点关于Guarantees的描述:

Note that the configuration topology.message.timeout.secs should be sufficiently more than windowLength + slidingInterval for time based windows; otherwise the tuples will timeout and get replayed and can result in duplicate evaluations. For count based windows, the configuration should be adjusted such that windowLength + slidingInterval tuples can be received within the timeout period.

简单点就是 topology.message.timeout.secs >> windowLength + slidingInterval, 不然可能会造成一个窗口期内接收到重复的tuples.