• +86-156-1535-0639
  • jianpengqi@126.com

Storm Worker内部消息处理

  • QI-Jianpeng

Worker内部消息处理顺序

Worker process

每一个worker 进程含有一个receive thread that listens on the worker’s TCP port( supervisor.slots.ports). 负责将接收到的数据放在executors线程的接收数据队列(Disruptor).

topology.receiver.buffer.size: 接收线程的batch size, 使用ArrayList类型, 默认8个元素, 为了保证后续与Disruptor结合, 设置时需要是2的指数(e.g.:16). 用ArrayList的原因是接收到的数据是私有的, 没有进行共享. 注: 设置的太大会造成heartbeat thread饿死, 吞吐量暴跌等.

1
2
3
// Example: configuring via Java API
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // default is 8

topology.transfer.buffer.size: 代表发送的tuples数量, 默认大小为1024.

1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // default is 1024

Executors(线程)

每一个executor都有自己的接收与发送队列. 可以通过topology.executor.receive.buffer.size, topology.executor.send.buffer.size进行配置.

topology.executor.receive.buffer.size : executor接收队列大小, 每一个元素是一个包含多个tuple的list. 默认大小1024, 必须是2的指数.

1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // batched; default is 1024

topology.executor.send.buffer.size: executor的发送数据队列. 每一个元素为单一的tuple. 默认1024大小, 必须是2的指数.

1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // individual tuples; default is 1024

每一个executor含有一个用户逻辑线程(Spout/bolt), 一个将数据从executor发送队列移动到worker transfer队列的线程.