• +86-156-1535-0639
  • jianpengqi@126.com

Storm TumblingWindow 解释

  • DataMiner

TumblingWindow 的使用

在数据流处理中不可避免的会涉及到Window的概念.
Storm中提供了两种类型的Window, 分别为SlidingWindowTumblingWindow.
前者是窗口一个一个Slide的往前滑动, 而后者是一个一个Window的往前滑动.
这里主要讲述TumblingWindow. 如下图:

1
2
3
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3

TumblingWindow中, w1滑动将会变为w2, 可以看出w1w2之间是没有交叉重叠的. 并且这个例子中是每隔5s滑动一次.

那么如何使用呢? Storm中默认的是使用以下方法对TumblingWindow进行定义.

1
2
3
4
5
withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.
withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.

我们可以看出有两种方式, 分别为count-basedtime-based.
当我们使用time-based, Storm默认情况下所使用timestampBolt处理当前tuple时所获得的timestamp.
在实际应用中可能会用自定义的time来使用time-based的窗口(比如使用历史数据进行测试, 历史数据中存在一个timestamp的字段). 这种情况下该如何做呢?

自定义timestamp的使用

Storm提供了自定义timestamp的方法.

1
2
3
4
5
6
7
8
/**
* Specify a field in the tuple that represents the timestamp as a long value. If
* this field is not present in the incoming tuple, an
*{@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)

在流过来的tuple中会寻找fieldName这个字段. 由于是在tuple中查找, 所以这就追溯到上游发送数据流的节点.
emit tuple时我们记得要实现declareOutputFields方法, 所以, 在这个地方我们要使用自定义的timestamp必须要在上游组件中的declareOutputFields方法里添加一个"timestamp"(或者其他有意义的名字)来指明这个字段, 然后在withTimestampField(String fieldName)传入"timestamp".

还需要注意的是, "timestamp"所指向的字段的类型必须是long型的.

bolt接收到数据后如何进行处理呢?

Bolt对接收到的窗口数据进行处理

在继承BaseWindowedBolt类后会Override execute(TupleWindow inputWindow)方法:

1
2
3
4
@Override
public void execute(TupleWindow inputWindow) {
}

在这个方法中要传入TupleWindow类型的参数. TupleWindow提供了三种方法用于对窗口数据进行操作:

1
2
3
get();//Gets the list of events in the window.
getExpired();//Get the list of events expired from the window since the last time the window was generated.
getNew();//Get the list of newly added events in the window since the last time the window was generated.

所以, 要想获取整个窗口内的数据, 调用get()方法. getExpired()用于获取刚过期的数据. getNew用于获取新到达的数据.

但是, 针对TumblingWindow, 什么才是Expired, 什么才是New呢? SlidingWindow很好理解, New就是最新的Slide, Expire就是刚过期的Slide. 但是TumblingWindow是整个窗口啊, 又对应什么呢?
BaseWindowedBolt中有这么一段代码:

1
2
3
4
5
6
7
8
/**
* A time duration based tumbling window.
*
* @param duration the time duration after which the window tumbles
*/
public BaseWindowedBolt withTumblingWindow(Duration duration) {
return withWindowLength(duration).withSlidingInterval(duration);
}

.withSlidingInterval(duration)这里就看出来了, 其实TumblingWindow的Slide Interval 就是窗口长度自身. 也就是说, 在TumblingWindowgetNew以及getExpired是没什么意义的. 如果调用getExpired, 那么返回的将是上一个TumblingWindow的数据. 如果getNew, 那就是新来的窗口的数据.