天天看點

Kafka+Storm+HDFS整合實踐

在基于Hadoop平台的很多應用場景中,我們需要對資料進行離線和實時分析,離線分析可以很容易地借助于Hive來實作統計分析,但是對于實時的需求Hive就不合适了。實時應用場景可以使用Storm,它是一個實時處理系統,它為實時處理類應用提供了一個計算模型,可以很容易地進行程式設計處理。為了統一離線和實時計算,一般情況下,我們都希望将離線和實時計算的資料源的集合統一起來作為輸入,然後将資料的流向分别經由實時系統和離線分析系統,分别進行分析處理,這時我們可以考慮将資料源(如使用Flume收集日志)直接連接配接一個消息中間件,如Kafka,可以整合Flume+Kafka,Flume作為消息的Producer,生産的消息資料(日志資料、業務請求資料等等)釋出到Kafka中,然後通過訂閱的方式,使用Storm的Topology作為消息的Consumer,在Storm叢集中分别進行如下兩個需求場景的處理:

直接使用Storm的Topology對資料進行實時分析處理

整合Storm+HDFS,将消息處理後寫入HDFS進行離線分析處理

實時處理,隻要開發滿足業務需要的Topology即可,不做過多說明。這裡,我們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,滿足上面提出的一些需求。配置實踐使用的軟體包如下所示:

zookeeper-3.4.5.tar.gz

kafka_2.9.2-0.8.1.1.tgz

apache-storm-0.9.2-incubating.tar.gz

hadoop-2.2.0.tar.gz

程式配置運作所基于的作業系統為CentOS 5.11。

Kafka安裝配置

我們使用3台機器搭建Kafka叢集:

1

192.168.4.142   h1

2

192.168.4.143   h2

3

192.168.4.144   h3

在安裝Kafka叢集之前,這裡沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper叢集,也是使用這3台機器,保證Zookeeper叢集正常運作。

首先,在h1上準備Kafka安裝檔案,執行如下指令:

cd /usr/local/

wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

tar xvzf kafka_2.9.2-0.8.1.1.tgz

4

ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

5

chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置檔案/usr/local/kafka/config/server.properties,修改如下内容:

broker.id=0

zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

這裡需要說明的是,預設Kafka會使用ZooKeeper預設的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper叢集,檢視ZooKeeper中資料可能會不直覺,是以強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:

而且,需要手動在ZooKeeper中建立路徑/kafka,使用如下指令連接配接到任意一台ZooKeeper伺服器:

cd /usr/local/zookeeper

bin/zkCli.sh

在ZooKeeper執行如下指令建立chroot路徑:

create /kafka ''

這樣,每次連接配接Kafka叢集的時候(使用--zookeeper選項),也必須使用帶chroot路徑的連接配接字元串,後面會看到。

然後,将配置好的安裝檔案同步到其他的h2、h3節點上:

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

最後,在h2、h3節點上配置,執行如下指令:

并修改配置檔案/usr/local/kafka/config/server.properties内容如下所示:

broker.id=1  # 在h1修改

broker.id=2  # 在h2修改

因為Kafka叢集需要保證各個Broker的id在整個叢集中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker程序來模拟分布式的Kafka叢集,也需要Broker的id唯一,還需要修改一些配置目錄的資訊)。

在叢集中的h1、h2、h3這三個節點上分别啟動Kafka,分别執行如下指令:

bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通過檢視日志,或者檢查程序狀态,保證Kafka叢集啟動成功。

我們建立一個名稱為my-replicated-topic5的Topic,5個分區,并且複制因子為3,執行如下指令:

bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

檢視建立的Topic,執行如下指令:

bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

結果資訊如下所示:

Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:

     Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1

     Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1

     Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1

     Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1

6

     Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1

上面Leader、Replicas、Isr的含義如下:

Partition: 分區

Leader   : 負責讀寫指定分區的節點

Replicas : 複制該分區log的節點清單

Isr      : "in-sync" replicas,目前活躍的副本清單(是一個子集),并且可能成為Leader

我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證示範如果釋出消息、消費消息。

在一個終端,啟動Producer,并向我們上面建立的名稱為my-replicated-topic5的Topic中生産消息,執行如下腳本:

bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

在另一個終端,啟動Consumer,并訂閱我們上面建立的名稱為my-replicated-topic5的Topic中生産的消息,執行如下腳本:

bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

可以在Producer終端上輸入字元串消息行,然後回車,就可以在Consumer終端上看到消費者消費的消息内容。

也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實作消息生産和消費的處理邏輯。

Storm安裝配置

Storm叢集也依賴Zookeeper叢集,要保證Zookeeper叢集正常運作。Storm的安裝配置比較簡單,我們仍然使用下面3台機器搭建:

首先,在h1節點上,執行如下指令安裝:

wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz

tar xvzf apache-storm-0.9.2-incubating.tar.gz

ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然後,修改配置檔案conf/storm.yaml,内容如下所示:

storm.zookeeper.servers:

     - "h1"

     - "h2"

     - "h3"

storm.zookeeper.port: 2181

#

07

nimbus.host: "h1"

08

09

supervisor.slots.ports:

10

    - 6700

11

    - 6701

12

    - 6702

13

    - 6703

14

15

storm.local.dir: "/tmp/storm"

将配置好的安裝檔案,分發到其他節點上:

scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/

scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

Storm叢集的主節點為Nimbus,從節點為Supervisor,我們需要在h1上啟動Nimbus服務,在從節點h2、h3上啟動Supervisor服務:

bin/storm nimbus &

bin/storm supervisor &

為了友善監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的運作狀态,例如在h2上啟動:

bin/storm ui &

這樣可以通過通路http://h2:8080/來檢視Topology的運作狀況。

整合Kafka+Storm

消息通過各種方式進入到Kafka消息中間件,比如可以通過使用Flume來收集日志資料,然後在Kafka中路由暫存,然後再由實時計算程式Storm做實時分析,這時我們就需要将在Storm的Spout中讀取Kafka中的消息,然後交由具體的Spot元件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個內建Kafka的外部插件程式storm-kafka,可以直接使用,例如我使用的Maven依賴配置,如下所示:

<dependency>

     <groupId>org.apache.storm</groupId>

     <artifactId>storm-core</artifactId>

     <version>0.9.2-incubating</version>

     <scope>provided</scope>

</dependency>

     <artifactId>storm-kafka</artifactId>

     <groupId>org.apache.kafka</groupId>

     <artifactId>kafka_2.9.2</artifactId>

     <version>0.8.1.1</version>

16

     <exclusions>

17

          <exclusion>

18

               <groupId>org.apache.zookeeper</groupId>

19

               <artifactId>zookeeper</artifactId>

20

          </exclusion>

21

22

               <groupId>log4j</groupId>

23

               <artifactId>log4j</artifactId>

24

25

     </exclusions>

26

下面,我們開發了一個簡單WordCount示例程式,從Kafka讀取訂閱的消息行,通過空格拆分出單個單詞,然後再做詞頻統計計算,實作的Topology的代碼,如下所示:

package org.shirdrn.storm.examples;

import java.util.Arrays;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;

import java.util.Map.Entry;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

027

import backtype.storm.topology.TopologyBuilder;

028

import backtype.storm.topology.base.BaseRichBolt;

029

import backtype.storm.tuple.Fields;

030

import backtype.storm.tuple.Tuple;

031

import backtype.storm.tuple.Values;

032

033

public class MyKafkaTopology {

034

035

     public static class KafkaWordSplitter extends BaseRichBolt {

036

037

          private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);

038

          private static final long serialVersionUID = 886149197481637894L;

039

          private OutputCollector collector;

040

041

          @Override

042

          public void prepare(Map stormConf, TopologyContext context,

043

                    OutputCollector collector) {

044

               this.collector = collector;             

045

          }

046

047

048

          public void execute(Tuple input) {

049

               String line = input.getString(0);

050

               LOG.info("RECV[kafka -> splitter] " + line);

051

               String[] words = line.split("\\s+");

052

               for(String word : words) {

053

                    LOG.info("EMIT[splitter -> counter] " + word);

054

                    collector.emit(input, new Values(word, 1));

055

               }

056

               collector.ack(input);

057

058

059

060

          public void declareOutputFields(OutputFieldsDeclarer declarer) {

061

               declarer.declare(new Fields("word", "count"));        

062

063

064

     }

065

066

     public static class WordCounter extends BaseRichBolt {

067

068

          private static final Log LOG = LogFactory.getLog(WordCounter.class);

069

070

071

          private Map<String, AtomicInteger> counterMap;

072

073

074

075

076

               this.collector = collector;   

077

               this.counterMap = new HashMap<String, AtomicInteger>();

078

079

080

081

082

               String word = input.getString(0);

083

               int count = input.getInteger(1);

084

               LOG.info("RECV[splitter -> counter] " + word + " : " + count);

085

               AtomicInteger ai = this.counterMap.get(word);

086

               if(ai == null) {

087

                    ai = new AtomicInteger();

088

                    this.counterMap.put(word, ai);

089

090

               ai.addAndGet(count);

091

092

               LOG.info("CHECK statistics map: " + this.counterMap);

093

094

095

096

          public void cleanup() {

097

               LOG.info("The final result:");

098

               Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();

099

               while(iter.hasNext()) {

100

                    Entry<String, AtomicInteger> entry = iter.next();

101

                    LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());

102

103

104

105

106

107

108

109

110

111

112

     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

113

          String zks = "h1:2181,h2:2181,h3:2181";

114

          String topic = "my-replicated-topic5";

115

          String zkRoot = "/storm"; // default zookeeper root configuration for storm

116

          String id = "word";

117

118

          BrokerHosts brokerHosts = new ZkHosts(zks);

119

          SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);

120

          spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

121

          spoutConf.forceFromStart = false;

122

          spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});

123

          spoutConf.zkPort = 2181;

124

125

          TopologyBuilder builder = new TopologyBuilder();

126

          builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我們建立了一個5分區的Topic,這裡并行度設定為5

127

          builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");

128

          builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

129

130

          Config conf = new Config();

131

132

          String name = MyKafkaTopology.class.getSimpleName();

133

          if (args != null && args.length > 0) {

134

               // Nimbus host name passed from command line

135

               conf.put(Config.NIMBUS_HOST, args[0]);

136

               conf.setNumWorkers(3);

137

               StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());

138

          } else {

139

               conf.setMaxTaskParallelism(3);

140

               LocalCluster cluster = new LocalCluster();

141

               cluster.submitTopology(name, conf, builder.createTopology());

142

               Thread.sleep(60000);

143

               cluster.shutdown();

144

145

146

}

上面程式,在本地調試(使用LocalCluster)不需要輸入任何參數,送出到實際叢集中運作時,需要傳遞一個參數,該參數為Nimbus的主機名稱。

通過Maven建構,生成一個包含依賴的single jar檔案(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在送出Topology程式到Storm叢集之前,因為用到了Kafka,需要拷貝一下依賴jar檔案到Storm叢集中的lib目錄下面:

cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/

cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然後,就可以送出我們開發的Topology程式了:

bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通過檢視日志檔案(logs/目錄下)或者Storm UI來監控Topology的運作狀況。如果程式沒有錯誤,可以使用前面我們使用的Kafka Producer來生成消息,就能看到我們開發的Storm Topology能夠實時接收到并進行處理。

上面Topology實作代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性如下所示:

spoutConf.forceFromStart = false;

該配置是指,如果該Topology因故障停止處理,下次正常運作時是否從Spout對應資料源Kafka中的該訂閱Topic的起始位置開始讀取,如果forceFromStart=true,則之前處理過的Tuple還要重新處理一遍,否則會從上次處理的位置繼續處理,保證Kafka中的Topic資料不被重複處理,是在資料源的位置進行狀态記錄。

整合Storm+HDFS

Storm實時計算叢集從Kafka消息中間件中消費消息,有實時處理需求的可以走實時處理程式,還有需要進行離線分析的需求,如寫入到HDFS進行分析。下面實作了一個Topology,代碼如下所示:

import java.text.DateFormat;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Random;

import org.apache.storm.hdfs.bolt.HdfsBolt;

import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;

import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;

import org.apache.storm.hdfs.bolt.format.FileNameFormat;

import org.apache.storm.hdfs.bolt.format.RecordFormat;

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;

import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;

import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;

import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;

import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.utils.Utils;

public class StormToHDFSTopology {

     public static class EventSpout extends BaseRichSpout {

          private static final Log LOG = LogFactory.getLog(EventSpout.class);

          private SpoutOutputCollector collector;

          private Random rand;

          private String[] records;

          public void open(Map conf, TopologyContext context,

                    SpoutOutputCollector collector) {

               rand = new Random();

               records = new String[] {

                         "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",

                         "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",

                         "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"

               };

          public void nextTuple() {

               Utils.sleep(1000);

               DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");

               Date d = new Date(System.currentTimeMillis());

               String minute = df.format(d);

               String record = records[rand.nextInt(records.length)];

               LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);

               collector.emit(new Values(minute, record));

               declarer.declare(new Fields("minute", "record"));        

          // use "|" instead of "," for field delimiter

          RecordFormat format = new DelimitedRecordFormat()

                  .withFieldDelimiter(" : ");

          // sync the filesystem after every 1k tuples

          SyncPolicy syncPolicy = new CountSyncPolicy(1000);

          // rotate files

          FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);

          FileNameFormat fileNameFormat = new DefaultFileNameFormat()

                  .withPath("/storm/").withPrefix("app_").withExtension(".log");

          HdfsBolt hdfsBolt = new HdfsBolt()

                  .withFsUrl("hdfs://h1:8020")

                  .withFileNameFormat(fileNameFormat)

                  .withRecordFormat(format)

                  .withRotationPolicy(rotationPolicy)

                  .withSyncPolicy(syncPolicy);

          builder.setSpout("event-spout", new EventSpout(), 3);

          builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));

          String name = StormToHDFSTopology.class.getSimpleName();

上面的處理邏輯,可以對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以設定在滿足什麼條件下,切出一個新的日志,如可以指定多長時間切出一個新的日志檔案,可以指定一個日志檔案大小達到設定值後,再寫一個新日志檔案),更多設定可以參考storm-hdfs,。

上面代碼在打包的時候,需要注意,使用storm-starter自帶的Maven打包配置,可能在将Topology部署運作的時候,會報錯,可以使用maven-shade-plugin這個插件,如下配置所示:

<plugin>

    <groupId>org.apache.maven.plugins</groupId>

    <artifactId>maven-shade-plugin</artifactId>

    <version>1.4</version>

    <configuration>

        <createDependencyReducedPom>true</createDependencyReducedPom>

    </configuration>

    <executions>

        <execution>

            <phase>package</phase>

            <goals>

                <goal>shade</goal>

            </goals>

            <configuration>

                <transformers>

                    <transformer

                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

                        <mainClass></mainClass>

                    </transformer>

                </transformers>

            </configuration>

        </execution>

    </executions>

</plugin>

整合Kafka+Storm+HDFS

上面分别對整合Kafka+Storm和Storm+HDFS做了實踐,可以将後者的Spout改成前者的Spout,從Kafka中消費消息,在Storm中可以做簡單處理,然後将資料寫入HDFS,最後可以在Hadoop平台上對資料進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,然後經由Storm處理,寫入到HDFS存儲,代碼如下所示:

public class DistributeWordTopology {

     public static class KafkaWordToUpperCase extends BaseRichBolt {

          private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);

          private static final long serialVersionUID = -5207232012035109026L;

               String line = input.getString(0).trim();

               if(!line.isEmpty()) {

                    String upperLine = line.toUpperCase();

                    LOG.info("EMIT[splitter -> counter] " + upperLine);

                    collector.emit(input, new Values(upperLine, upperLine.length()));

               declarer.declare(new Fields("line", "len"));        

     public static class RealtimeBolt extends BaseRichBolt {

          private static final long serialVersionUID = -4115132557403913367L;

               LOG.info("REALTIME: " + line);

          // Configure Kafka

          // Configure HDFS bolt

                  .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter

          SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples

          FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files

                  .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format

          // configure & build topology

          builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);

          builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");

          builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");

          builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");

          // submit topology

          String name = DistributeWordTopology.class.getSimpleName();

               String nimbus = args[0];

               conf.put(Config.NIMBUS_HOST, nimbus);

147

148

149

150

上面代碼中,名稱為to-upper的Bolt将接收到的字元串行轉換成大寫以後,會将處理過的資料向後面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,然後由這兩個Bolt分别根據實際需要(實時/離線)單獨處理。

打包後,在Storm叢集上部署并運作這個Topology:

bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通過Storm UI檢視Topology運作情況,可以檢視HDFS上生成的資料。

參考連結

http://kafka.apache.org/

http://kafka.apache.org/documentation.html

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

http://storm.apache.org/

http://storm.apache.org/documentation/Tutorial.html

http://storm.apache.org/documentation/FAQ.html

https://github.com/ptgoetz/storm-hdfs

轉載:http://shiyanjun.cn/archives/934.html

繼續閱讀