在基于Hadoop平台的很多應用場景中,我們需要對資料進行離線和實時分析,離線分析可以很容易地借助于Hive來實作統計分析,但是對于實時的需求Hive就不合适了。實時應用場景可以使用Storm,它是一個實時處理系統,它為實時處理類應用提供了一個計算模型,可以很容易地進行程式設計處理。為了統一離線和實時計算,一般情況下,我們都希望将離線和實時計算的資料源的集合統一起來作為輸入,然後将資料的流向分别經由實時系統和離線分析系統,分别進行分析處理,這時我們可以考慮将資料源(如使用Flume收集日志)直接連接配接一個消息中間件,如Kafka,可以整合Flume+Kafka,Flume作為消息的Producer,生産的消息資料(日志資料、業務請求資料等等)釋出到Kafka中,然後通過訂閱的方式,使用Storm的Topology作為消息的Consumer,在Storm叢集中分别進行如下兩個需求場景的處理:
直接使用Storm的Topology對資料進行實時分析處理
整合Storm+HDFS,将消息處理後寫入HDFS進行離線分析處理
實時處理,隻要開發滿足業務需要的Topology即可,不做過多說明。這裡,我們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,滿足上面提出的一些需求。配置實踐使用的軟體包如下所示:
zookeeper-3.4.5.tar.gz
kafka_2.9.2-0.8.1.1.tgz
apache-storm-0.9.2-incubating.tar.gz
hadoop-2.2.0.tar.gz
程式配置運作所基于的作業系統為CentOS 5.11。
Kafka安裝配置
我們使用3台機器搭建Kafka叢集:
1
192.168.4.142 h1
2
192.168.4.143 h2
3
192.168.4.144 h3
在安裝Kafka叢集之前,這裡沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper叢集,也是使用這3台機器,保證Zookeeper叢集正常運作。
首先,在h1上準備Kafka安裝檔案,執行如下指令:
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar xvzf kafka_2.9.2-0.8.1.1.tgz
4
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
5
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
修改配置檔案/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
這裡需要說明的是,預設Kafka會使用ZooKeeper預設的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper叢集,檢視ZooKeeper中資料可能會不直覺,是以強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:
而且,需要手動在ZooKeeper中建立路徑/kafka,使用如下指令連接配接到任意一台ZooKeeper伺服器:
cd /usr/local/zookeeper
bin/zkCli.sh
在ZooKeeper執行如下指令建立chroot路徑:
create /kafka ''
這樣,每次連接配接Kafka叢集的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接配接字元串,後面會看到。
然後,将配置好的安裝檔案同步到其他的h2、h3節點上:
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最後,在h2、h3節點上配置,執行如下指令:
并修改配置檔案/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1 # 在h1修改
broker.id=2 # 在h2修改
因為Kafka叢集需要保證各個Broker的id在整個叢集中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker程序來模拟分布式的Kafka叢集,也需要Broker的id唯一,還需要修改一些配置目錄的資訊)。
在叢集中的h1、h2、h3這三個節點上分别啟動Kafka,分别執行如下指令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
可以通過檢視日志,或者檢查程序狀态,保證Kafka叢集啟動成功。
我們建立一個名稱為my-replicated-topic5的Topic,5個分區,并且複制因子為3,執行如下指令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
檢視建立的Topic,執行如下指令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
結果資訊如下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1
Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
6
Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
上面Leader、Replicas、Isr的含義如下:
Partition: 分區
Leader : 負責讀寫指定分區的節點
Replicas : 複制該分區log的節點清單
Isr : "in-sync" replicas,目前活躍的副本清單(是一個子集),并且可能成為Leader
我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證示範如果釋出消息、消費消息。
在一個終端,啟動Producer,并向我們上面建立的名稱為my-replicated-topic5的Topic中生産消息,執行如下腳本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一個終端,啟動Consumer,并訂閱我們上面建立的名稱為my-replicated-topic5的Topic中生産的消息,執行如下腳本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer終端上輸入字元串消息行,然後回車,就可以在Consumer終端上看到消費者消費的消息内容。
也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實作消息生産和消費的處理邏輯。
Storm安裝配置
Storm叢集也依賴Zookeeper叢集,要保證Zookeeper叢集正常運作。Storm的安裝配置比較簡單,我們仍然使用下面3台機器搭建:
首先,在h1節點上,執行如下指令安裝:
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
tar xvzf apache-storm-0.9.2-incubating.tar.gz
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
然後,修改配置檔案conf/storm.yaml,内容如下所示:
storm.zookeeper.servers:
- "h1"
- "h2"
- "h3"
storm.zookeeper.port: 2181
#
07
nimbus.host: "h1"
08
09
supervisor.slots.ports:
10
- 6700
11
- 6701
12
- 6702
13
- 6703
14
15
storm.local.dir: "/tmp/storm"
将配置好的安裝檔案,分發到其他節點上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
Storm叢集的主節點為Nimbus,從節點為Supervisor,我們需要在h1上啟動Nimbus服務,在從節點h2、h3上啟動Supervisor服務:
bin/storm nimbus &
bin/storm supervisor &
為了友善監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的運作狀态,例如在h2上啟動:
bin/storm ui &
這樣可以通過通路http://h2:8080/來檢視Topology的運作狀況。
整合Kafka+Storm
消息通過各種方式進入到Kafka消息中間件,比如可以通過使用Flume來收集日志資料,然後在Kafka中路由暫存,然後再由實時計算程式Storm做實時分析,這時我們就需要将在Storm的Spout中讀取Kafka中的消息,然後交由具體的Spot元件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個內建Kafka的外部插件程式storm-kafka,可以直接使用,例如我使用的Maven依賴配置,如下所示:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<artifactId>storm-kafka</artifactId>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
16
<exclusions>
17
<exclusion>
18
<groupId>org.apache.zookeeper</groupId>
19
<artifactId>zookeeper</artifactId>
20
</exclusion>
21
22
<groupId>log4j</groupId>
23
<artifactId>log4j</artifactId>
24
25
</exclusions>
26
下面,我們開發了一個簡單WordCount示例程式,從Kafka讀取訂閱的消息行,通過空格拆分出單個單詞,然後再做詞頻統計計算,實作的Topology的代碼,如下所示:
package org.shirdrn.storm.examples;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
027
import backtype.storm.topology.TopologyBuilder;
028
import backtype.storm.topology.base.BaseRichBolt;
029
import backtype.storm.tuple.Fields;
030
import backtype.storm.tuple.Tuple;
031
import backtype.storm.tuple.Values;
032
033
public class MyKafkaTopology {
034
035
public static class KafkaWordSplitter extends BaseRichBolt {
036
037
private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
038
private static final long serialVersionUID = 886149197481637894L;
039
private OutputCollector collector;
040
041
@Override
042
public void prepare(Map stormConf, TopologyContext context,
043
OutputCollector collector) {
044
this.collector = collector;
045
}
046
047
048
public void execute(Tuple input) {
049
String line = input.getString(0);
050
LOG.info("RECV[kafka -> splitter] " + line);
051
String[] words = line.split("\\s+");
052
for(String word : words) {
053
LOG.info("EMIT[splitter -> counter] " + word);
054
collector.emit(input, new Values(word, 1));
055
}
056
collector.ack(input);
057
058
059
060
public void declareOutputFields(OutputFieldsDeclarer declarer) {
061
declarer.declare(new Fields("word", "count"));
062
063
064
}
065
066
public static class WordCounter extends BaseRichBolt {
067
068
private static final Log LOG = LogFactory.getLog(WordCounter.class);
069
070
071
private Map<String, AtomicInteger> counterMap;
072
073
074
075
076
this.collector = collector;
077
this.counterMap = new HashMap<String, AtomicInteger>();
078
079
080
081
082
String word = input.getString(0);
083
int count = input.getInteger(1);
084
LOG.info("RECV[splitter -> counter] " + word + " : " + count);
085
AtomicInteger ai = this.counterMap.get(word);
086
if(ai == null) {
087
ai = new AtomicInteger();
088
this.counterMap.put(word, ai);
089
090
ai.addAndGet(count);
091
092
LOG.info("CHECK statistics map: " + this.counterMap);
093
094
095
096
public void cleanup() {
097
LOG.info("The final result:");
098
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
099
while(iter.hasNext()) {
100
Entry<String, AtomicInteger> entry = iter.next();
101
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
102
103
104
105
106
107
108
109
110
111
112
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
113
String zks = "h1:2181,h2:2181,h3:2181";
114
String topic = "my-replicated-topic5";
115
String zkRoot = "/storm"; // default zookeeper root configuration for storm
116
String id = "word";
117
118
BrokerHosts brokerHosts = new ZkHosts(zks);
119
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
120
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
121
spoutConf.forceFromStart = false;
122
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
123
spoutConf.zkPort = 2181;
124
125
TopologyBuilder builder = new TopologyBuilder();
126
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我們建立了一個5分區的Topic,這裡并行度設定為5
127
builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
128
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
129
130
Config conf = new Config();
131
132
String name = MyKafkaTopology.class.getSimpleName();
133
if (args != null && args.length > 0) {
134
// Nimbus host name passed from command line
135
conf.put(Config.NIMBUS_HOST, args[0]);
136
conf.setNumWorkers(3);
137
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
138
} else {
139
conf.setMaxTaskParallelism(3);
140
LocalCluster cluster = new LocalCluster();
141
cluster.submitTopology(name, conf, builder.createTopology());
142
Thread.sleep(60000);
143
cluster.shutdown();
144
145
146
}
上面程式,在本地調試(使用LocalCluster)不需要輸入任何參數,送出到實際叢集中運作時,需要傳遞一個參數,該參數為Nimbus的主機名稱。
通過Maven建構,生成一個包含依賴的single jar檔案(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在送出Topology程式到Storm叢集之前,因為用到了Kafka,需要拷貝一下依賴jar檔案到Storm叢集中的lib目錄下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
然後,就可以送出我們開發的Topology程式了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
可以通過檢視日志檔案(logs/目錄下)或者Storm UI來監控Topology的運作狀況。如果程式沒有錯誤,可以使用前面我們使用的Kafka Producer來生成消息,就能看到我們開發的Storm Topology能夠實時接收到并進行處理。
上面Topology實作代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性如下所示:
spoutConf.forceFromStart = false;
該配置是指,如果該Topology因故障停止處理,下次正常運作時是否從Spout對應資料源Kafka中的該訂閱Topic的起始位置開始讀取,如果forceFromStart=true,則之前處理過的Tuple還要重新處理一遍,否則會從上次處理的位置繼續處理,保證Kafka中的Topic資料不被重複處理,是在資料源的位置進行狀态記錄。
整合Storm+HDFS
Storm實時計算叢集從Kafka消息中間件中消費消息,有實時處理需求的可以走實時處理程式,還有需要進行離線分析的需求,如寫入到HDFS進行分析。下面實作了一個Topology,代碼如下所示:
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;
public class StormToHDFSTopology {
public static class EventSpout extends BaseRichSpout {
private static final Log LOG = LogFactory.getLog(EventSpout.class);
private SpoutOutputCollector collector;
private Random rand;
private String[] records;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
rand = new Random();
records = new String[] {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
};
public void nextTuple() {
Utils.sleep(1000);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
Date d = new Date(System.currentTimeMillis());
String minute = df.format(d);
String record = records[rand.nextInt(records.length)];
LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
collector.emit(new Values(minute, record));
declarer.declare(new Fields("minute", "record"));
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
builder.setSpout("event-spout", new EventSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
String name = StormToHDFSTopology.class.getSimpleName();
上面的處理邏輯,可以對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以設定在滿足什麼條件下,切出一個新的日志,如可以指定多長時間切出一個新的日志檔案,可以指定一個日志檔案大小達到設定值後,再寫一個新日志檔案),更多設定可以參考storm-hdfs,。
上面代碼在打包的時候,需要注意,使用storm-starter自帶的Maven打包配置,可能在将Topology部署運作的時候,會報錯,可以使用maven-shade-plugin這個插件,如下配置所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
整合Kafka+Storm+HDFS
上面分别對整合Kafka+Storm和Storm+HDFS做了實踐,可以将後者的Spout改成前者的Spout,從Kafka中消費消息,在Storm中可以做簡單處理,然後将資料寫入HDFS,最後可以在Hadoop平台上對資料進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,然後經由Storm處理,寫入到HDFS存儲,代碼如下所示:
public class DistributeWordTopology {
public static class KafkaWordToUpperCase extends BaseRichBolt {
private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -5207232012035109026L;
String line = input.getString(0).trim();
if(!line.isEmpty()) {
String upperLine = line.toUpperCase();
LOG.info("EMIT[splitter -> counter] " + upperLine);
collector.emit(input, new Values(upperLine, upperLine.length()));
declarer.declare(new Fields("line", "len"));
public static class RealtimeBolt extends BaseRichBolt {
private static final long serialVersionUID = -4115132557403913367L;
LOG.info("REALTIME: " + line);
// Configure Kafka
// Configure HDFS bolt
.withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
// configure & build topology
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");
// submit topology
String name = DistributeWordTopology.class.getSimpleName();
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
147
148
149
150
上面代碼中,名稱為to-upper的Bolt将接收到的字元串行轉換成大寫以後,會将處理過的資料向後面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,然後由這兩個Bolt分别根據實際需要(實時/離線)單獨處理。
打包後,在Storm叢集上部署并運作這個Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
可以通過Storm UI檢視Topology運作情況,可以檢視HDFS上生成的資料。
參考連結
http://kafka.apache.org/
http://kafka.apache.org/documentation.html
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
http://storm.apache.org/
http://storm.apache.org/documentation/Tutorial.html
http://storm.apache.org/documentation/FAQ.html
https://github.com/ptgoetz/storm-hdfs
轉載:http://shiyanjun.cn/archives/934.html