• +86-156-1535-0639
  • jianpengqi@126.com

Spout读取数据

  • DataMiner

Spout作用

Spout: A spout is a source of streams in a topology. Generally spouts will read tuples from an external source and emit them into the topology (e.g. a Kestrel queue or the Twitter API). Spouts can either be reliable or unreliable. A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted.

Spouts can emit more than one stream. To do so, declare multiple streams using the declareStream method of OutputFieldsDeclarer(Interface) and specify the stream to emit to when using the emit method on SpoutOutputCollector(Class).

The main method on spouts is nextTuple. nextTuple either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative(必要的) that nextTuple does not block for any spout implementation, because Storm calls all the spout methods on the same thread.

The other main methods on spouts are ack and fail. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack and fail are only called for reliable spouts. See the here for more information.

因此, Spout中不能存在阻塞方法, 因为Spout是单线程.另外, Spout中重要的是ack()fail()方法.

Spout处理数据的流程

在上文中提到, 若想使用Spout发送数据, 在使用SpoutOutputCollector中的emit()方法时要对OutputFieldsDeclarer对数据流进行声明, 即declareStream(). 因此发送数据的过程大致可分为:

  1. 实现OutputFieldsDeclarer.declareStream().
  2. 调用emit()方法.
    那么, 如何读取数据呢?可以猜出, 读取数据在第1步declareStream()实现. 由于OutputFieldsDeclarer属于Interface, 应该哪个类进行implements呢?

我们这时情不自禁想起了Storm提供的Storm-Starter的例子. 里面有一个叫RandomSentenceSpout的类, 看一看这个类的声明为: public class RandomSentenceSpout extends BaseRichSpout, 这里并没有想看到的OutputFieldsDeclarer接口, 但是却Overwrite了declareOutputFields方法:

1
2
3
4
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

到这里你可能比较疑惑, 为什么在OutputFieldsDeclarer中明明调用的是declareOutputFields方法,这里却变成了declareStream? 其实declare方法中调用了’declareStream’方法, 并传入了默认stream id.

这一定是BaseRichSpout继承得到的方法. 再查看BaseRichSpout(_abstract class_)的源代码, 发现其extends BaseComponent implements IRichSpout并且声明了4个函数:

1
close(); activate(); deactivate(); ack(Object msgId); fail(Object msgId);

并没有declareOutputFields, 那一定是来自于BaseComponent了. 最终在org.apache.storm.topology.IComponent找到了该方法.
可以看出IComponent的继承/实现过程为RandomSentenceSpout<–BaseRichSpout<–BaseComponent<–IComponent.

declareOutputFields的API解释如下:

Declare the output schema for all the streams of this topology. declarer is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream.
因此, 它是用来定义当前topology的输出流属性的.

那么, 这个方法是如何工作的呢?
main方法里我们可以看到:

1
2
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);

其中调用了setSpout方法, 这个方法, 就将Spout传递给了Topology.
RandomSentenceSpout 调用nextTuple方法不断地产生数据流. 当产生数据流后要向topology发送, 要发送数据, 如何进行识别呢? 上文讲到了, 在emit时还要实现OutputFieldsDeclarerdeclareStream()方法. 这个方法的调用过程是这样的:
TopologyBuilder.createTopology() –> componentCommon.getComponentCommon() –> component.declareOutputFields(outputFieldsGetter) –> ‘OutputFieldsDeclarer.declareOutputFields()’.

现在就对上号了, Spout通过多重继承/implements IComponent 实现declareOutputFields方法, 然后在该方法中调用OutputFieldsDeclarer中的declaredeclareStream方法, 之后由topology创建流的映射关系, 再由emit方法, 将数据发送到整个topology中去. 因此, declare方法是必须要实现的方法. Topology中的fieldsGrouping至关重要, 它建立了与declare方法中传入的Field对应. 而emit又与declarer对应, 这就形成了一个铰链. nextTuple中调用emit方法, 实现了数据到topology中的映射. 最后交由Boltexecute方法对接收到的每个tuple进行处理.

Some useful links: Lifecycle of a topology

Spout读取数据的方式

Storm读取数据可以与很多平台进行整合. 比如HDFS. 后续将结合HDFS进行数据的读取.