• +86-156-1535-0639
  • jianpengqi@126.com

Spout读取HDFS数据

  • DataMiner

Storm Spout与HDFS的结合

Storm HDFS Integration
上会看到这么一句话”Storm components for interacting with HDFS file systems”
也就是说, Storm提供了用于与HDFS进行交互的组件.
因此, 这里不需要对其进行额外的配置, 至于要利用其组件, 就可以实现Storm与HDFS的交互.

虽然Storm在针对已经存在的大量数据处理上不太适合(比较适合实时数据流), 但总会有这样的需求,
比如做实验需要一些带label的数据, 而这些数据都是人为处理好的.

读数据

由于是在平时学习工作中进行的任务, 为了方便, 存储在HDFS上的文件路径统一为/storm/data/
安装Hadoop参考.

1
2
3
4
sbin/start-dfs.sh #启动hdfs
bin/hdfs dfs -mkdir -p /storm/data #创建/storm/data目录
bin/hdfs dfs -ls -R / #查看目录是否创建成功
bin/hdfs dfs -put ~/Downloads/10g_50obj_50t_sorted.txt /storm/data #上传文件到hdfs

这里得到了路径hdfs://localhost/storm/data
这里额外多说一句, 在core-site.xml文件中一定要把fs.defaultFS路径配置正确,
因为<value>hdfs://localhost:9000/</value>最后面的一个/忘记填了, 找了好长时间才找到错误.

读取数据代码

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
////////////////////////////////////
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost"), new Configuration());
String hdfspath = "/storm/data/10g_50obj_50t_sorted.txt";
Path path = new Path();
if (fs.exists(path)) {
InputStreamReader reader = new InputStreamReader(fs.open(path));
// do stuff with the reader
} else {
LOG.info("Does not exist {}", hdfspath);
}

由于这里要使用spout来集成, 幸运的是storm提供了
import org.apache.storm.hdfs.spout.HdfsSpout这么一个类. 通过这个类, 可以对hdfs
的文件进行操作.

那么, HdfsSpout是如何发送数据的呢?
HdfsSpout这个类的nextTuple方法中有至关重要的一行

1
List<Object> tuple = reader.next();

也就是说, 通过FileReader的一个实例reader来对Hdfs上的文件进行读取, 通过next()方法, 将获得
指定文件中的一行数据, 即, next()是按行读取的. declareOutputFields方法实现的是

1
2
3
4
5
6
7
8
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
if (outputStreamName!=null) {
declarer.declareStream(outputStreamName, outputFields);
} else {
declarer.declare(outputFields);
}
}

下面上HDFS文件读写操作的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public static void main(String[] args) throws TException, InterruptedException {
// 1 - parameters
String hdfsUri = "hdfs://localhost:9000";
String sourceDir = "/storm/data/";
String archiveDir = "/storm/done/";
String badFilesDir = "/storm/badfiles/";
// 2 - create spout and bolt
HdfsSpout hdfsSpout = new HdfsSpout().withOutputFields(TextFileReader.defaultFields)
.setHdfsUri(hdfsUri)
.setArchiveDir(archiveDir)
.setSourceDir(sourceDir)
.setBadFilesDir(badFilesDir)
.setReaderType(Configs.TEXT);//通过阅读源码发现, 这一行是不能少的
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("\t");
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/output/");
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
FileRotationPolicy fileRotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl(hdfsUri)
.withRecordFormat(format)
.withRotationPolicy(fileRotationPolicy)
.withSyncPolicy(syncPolicy)
.withFileNameFormat(fileNameFormat)
;
// 3 - create and configure topology
Config conf = new Config();
/*conf.setNumWorkers(1);
conf.setNumAckers(1);
conf.setMaxTaskParallelism(1);
conf.setDebug(true);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);*/
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("hdfsspout", hdfsSpout, 1);
topologyBuilder.setBolt("hdfsbolt", hdfsBolt, 1)
.shuffleGrouping("hdfsspout");
// 4 - submit topology, wait for a few min and terminate it
// Map clusterConf = Utils.readStormConfig();
String topologyName = "outlierTopology";
LocalCluster localCluster = new LocalCluster();
StormTopology stormTopology = topologyBuilder.createTopology();
stormTopology.validate();
localCluster.submitTopology(topologyName,conf, stormTopology);
// StormSubmitter.submitTopologyWithProgressBar(topologyName, config, topologyBuilder.createTopology());
// Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
Utils.sleep(20000);
localCluster.shutdown();
//kill(client, topologyName);
}

NOTES:

storm-hdfs
有这么一句话”Hdfs spout assumes that the files being made visible to it in the monitored directory are NOT actively being written to. “
意思是说storm spout监视的hdfs文件夹是一直可见的, 如果在这个过程中有文件正在被写入, 是会出状况的.
具体的解决方法为:

Only after a file is completely written should it be made visible to the spout.
This can be achieved by either writing the files out to another directory and
once completely written, move it to the monitored directory.
Alternatively the file can be created with a ‘.ignore’ suffix in the monitored
directory and after data is completely written, rename it without the suffix.
File names with a ‘.ignore’ suffix are ignored by the spout.

Trouble shooting

No FileSystem for scheme: hdfs

你可能会遇到No FileSystem for scheme: hdfs这样的错误, 这是采用Maven构建项目,
将包含有相同org.apache.hadoop.fs.FileSystemJar包打包成了一个, 造成了冲突.

1
2
3
4
5
6
7
java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)

解决方案

在maven配置文件pom.xml的<plugins>范围内加入如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

ERROR org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died

在使用LocalCluster提交任务时出现错误如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ERROR org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
org.apache.storm.generated.InvalidTopologyException: null
at org.apache.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:185) ~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.daemon.common$system_topology_BANG_.invoke(common.clj:378) ~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopologyWithOpts(nimbus.clj:1694) ~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopology(nimbus.clj:1726) ~[storm-core-1.1.0.jar:1.1.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.7.0.jar:?]
at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:310) ~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) ~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-1.1.0.jar:1.1.0]
at storm.StreamOutlierTopology.main(StreamOutlierTopology.java:69) ~[classes/:?]

解决方案

官网的spout可能是忽略了一点, 我的hdfs上存储的是text文件, 因此要额外的设置文件的类型.
.setReaderType(Configs.TEXT).

1
2
3
4
5
6
HdfsSpout hdfsSpout = new HdfsSpout().withOutputFields("hdfsspout")
.setHdfsUri(hdfsUri)
.setArchiveDir(archiveDir)
.setSourceDir(sourceDir)
.setBadFilesDir(badFilesDir)
.setReaderType(Configs.TEXT);