一、flume配置
flume要求1.6以上版本
flume-conf.properties檔案配置内容,sinks的輸出作為kafka的product
[html] view plain copy
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /home/airib/work/log.log
- # Describe the sink
- #a1.sinks.k1.type = logger
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.topic = test
- a1.sinks.k1.brokerList = localhost:9092
- a1.sinks.k1.requiredAcks = 1
- a1.sinks.k1.batchSize = 20
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
flume啟動
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
二 kafka的消費者java源代碼
[html] view plain copy
- package com.hgp.kafka.kafka;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import kafka.serializer.StringDecoder;
- import kafka.utils.VerifiableProperties;
- public class KafkaConsumer {
- private final ConsumerConnector consumer;
- private KafkaConsumer() {
- Properties props = new Properties();
- //zookeeper 配置
- props.put("zookeeper.connect", "localhost:2181");
- //group 代表一個消費組
- props.put("group.id", "jd-group");
- //zk連接配接逾時
- props.put("zookeeper.session.timeout.ms", "4000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "smallest");
- //序列化類
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- ConsumerConfig config = new ConsumerConfig(props);
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
- }
- void consume() {
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put("test", new Integer(1));
- StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
- StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
- Map<String, List<KafkaStream<String, String>>> consumerMap =
- consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
- KafkaStream<String, String> stream = consumerMap.get("test").get(0);
- ConsumerIterator<String, String> it = stream.iterator();
- while (it.hasNext())
- System.out.println(it.next().message());
- }
- public static void main(String[] args) {
- new KafkaConsumer().consume();
- }
- }
kafka啟動指令
啟動Zookeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動Kafka server:
bin/kafka-server-start.sh config/server.properties &
運作producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
運作consumer:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
二、示例
[html] view plain copy
- package com.hgp.kafka.kafka;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import storm.kafka.BrokerHosts;
- import storm.kafka.KafkaSpout;
- import storm.kafka.SpoutConfig;
- import storm.kafka.StringScheme;
- import storm.kafka.ZkHosts;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.StormSubmitter;
- import backtype.storm.generated.AlreadyAliveException;
- import backtype.storm.generated.InvalidTopologyException;
- import backtype.storm.spout.SchemeAsMultiScheme;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- public class MyKafkaTopology {
- public static class KafkaWordSplitter extends BaseRichBolt {
- private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
- private static final long serialVersionUID = 886149197481637894L;
- private OutputCollector collector;
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- }
- public void execute(Tuple input) {
- String line = input.getString(0);
- LOG.info("RECV[kafka -> splitter] " + line);
- String[] words = line.split("\\s+");
- for(String word : words) {
- LOG.info("EMIT[splitter -> counter] " + word);
- collector.emit(input, new Values(word, 1));
- }
- collector.ack(input);
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
- public static class WordCounter extends BaseRichBolt {
- private static final Log LOG = LogFactory.getLog(WordCounter.class);
- private static final long serialVersionUID = 886149197481637894L;
- private OutputCollector collector;
- private Map<String, AtomicInteger> counterMap;
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- this.counterMap = new HashMap<String, AtomicInteger>();
- }
- public void execute(Tuple input) {
- String word = input.getString(0);
- int count = input.getInteger(1);
- LOG.info("RECV[splitter -> counter] " + word + " : " + count);
- AtomicInteger ai = this.counterMap.get(word);
- if(ai == null) {
- ai = new AtomicInteger();
- this.counterMap.put(word, ai);
- }
- ai.addAndGet(count);
- collector.ack(input);
- LOG.info("CHECK statistics map: " + this.counterMap);
- }
- public void cleanup() {
- LOG.info("The final result:");
- Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
- while(iter.hasNext()) {
- Entry<String, AtomicInteger> entry = iter.next();
- LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
- }
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
- public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
- String zks = "localhost:2181";
- String topic = "test";
- String zkRoot = "/storm"; // default zookeeper root configuration for storm
- String id = "word";
- BrokerHosts brokerHosts = new ZkHosts(zks);
- SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
- spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
- spoutConf.forceFromStart = true;
- spoutConf.zkServers = Arrays.asList(new String[] {"localhost"});
- spoutConf.zkPort = 2181;
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我們建立了一個5分區的Topic,這裡并行度設定為5
- builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
- builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
- Config conf = new Config();
- String name = MyKafkaTopology.class.getSimpleName();
- if (args != null && args.length > 0) {
- // Nimbus host name passed from command line
- conf.put(Config.NIMBUS_HOST, args[0]);
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(name, conf, builder.createTopology());
- Thread.sleep(60000);
- cluster.shutdown();
- }
- }
- }
pom.xml代碼
[html] view plain copy
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.ymm</groupId>
- <artifactId>TestStorm</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>TestStorm</name>
- <url>http://maven.apache.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.10.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>0.10.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
- <version>0.8.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- </dependency>
- </dependencies>
- </project>
[html] view plain copy
[html] view plain copy
[html] view plain copy
- 三、storm部署
[html] view plain copy
- <p><span style="color: rgb(85, 85, 85); font-family: Consolas, 'Bitstream Vera sans Mono', 'Courier new', Courier, monospace; font-size: 14px; line-height: 15.3906px; white-space: pre;">1)打jar包 mvn clean package</span>
- </p><p><span style="color: rgb(85, 85, 85); font-family: Consolas, 'Bitstream Vera sans Mono', 'Courier new', Courier, monospace; font-size: 14px; line-height: 15.3906px; text-indent: 28px; white-space: pre;">2)上傳storm叢集 storm jar xxx.jar com.sss.class</span></p>
1. ZooKeeper
安裝參考
2. Kafka
2.1 解壓安裝
# 確定scala已經安裝好,本文安裝的是2.11.7
tar -xf kafka_2.-..tgz
cd kafka_2.-.
mkdir logs
vim ~/.bash_profile
export KAFKA_HOME=/home/zkpk/kafka_2.-.
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bash_profile
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
2.2 配置
2.2.1 server.properties
隻設定了以下4項,其他使用預設值
# 目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
broker.id=
host.name=hsm01
# 消息存放的目錄,這個目錄可以配置為“,”逗号分割的表達式,
# 上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,
# 新建立的topic他把消息持久化的地方是,目前以逗号分割的目錄中,
# 那個分區數最少就放那一個
log.dirs=/home/zkpk/kafka_2.-./logs
# 配置自定義的ZooKeeper
zookeeper.connect=hsm01:,hss01:,hss02:/kafka
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
2.2.2 複制到其他節點
scp -r ~/kafka_2-/ hss01:~/
scp -r ~/kafka_2-/ hss02:~/
# 修改broker.id與host.name
# 配置環境變量
- 1
- 2
- 3
- 4
- 5
2.3 啟動
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
- 1
2.4 測試
# 建立Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 --partitions 1 --topic shuaige
# 建立一個broker,釋出者
kafka-console-producer.sh --broker-list hsm01:9092 --topic shuaige
# 建立一個訂閱者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic shuaige --from-beginning
# 檢視主題
kafka-topics.sh --zookeeper hsm01:2181/kafka --list
# 檢視主題詳情
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
# 删除主題
kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
2.5 參考
Kafka【第一篇】Kafka叢集搭建
3. Flume
3.1 解壓安裝
# /home/zkpk目錄
tar -xf apache-flume-.-bin.tar.gz
mv apache-flume-.-bin/ flume-.
# 配置環境變量
vim .bash_profile
export FLUME_HOME=/home/zkpk/flume-.
export PATH=$PATH:$FLUME_HOME/bin
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
3.2 配置(與kafka整合)
kafkasink隻有在1.6.0以上的flume版本才有。
3.2.1 flume-env.sh
JAVA_HOME=/opt/jdk1.8.0_45
- 1
3.2.2 kafka-sogolog.properties
# configure agent
a1.sources = f1
a1.channels = c1
a1.sinks = k1
# configure the source
a1.sources.f1.type = netcat
a1.sources.f1.bind = localhost
a1.sources.f1.port =
# configure the sink (kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = sogolog
a1.sinks.k1.brokerList = hsm01:,hss01:/kafka
a1.sinks.k1.requiredAcks =
a1.sinks.k1.batchSize =
# configure the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity =
# bind the source and sink to the channel
a1.sources.f1.channels = c1
a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
3.3 啟動
啟動ZooKeeper服務
$ZOOKEEPER_HOME/bin/zkServer.sh start
- 1
啟動kafka
# 啟動服務
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 建立Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 --partitions 1 --topic sogolog
# 建立一個訂閱者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic sogolog --from-beginning
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
啟動flume
flume-ng agent -n a1 -c conf -f conf/kafka-sogolog.properties -Dflume.root.logger=DEBUG,console
- 1
注:指令中的a1表示配置檔案中的Agent的Name,如配置檔案中的a1。flume-conf.properties表示配置檔案所在配置,需填寫準确的配置檔案路徑。
3.4 測試
telnet輸入

flume采集資料
kafka接收資料
3.5 參考
高可用Hadoop平台-Flume NG實戰圖解篇
linux安裝flume及問題
Flume ng1.6 + kafka 2.11 整合
Flume自定義Hbase Sink的EventSerializer序列化類
Flume 1.6.0 User Guide
org/apache/flume/tools/GetJavaProperty
4. Storm
4.1 安裝
Storm安裝
4.2 簡單測試
4.2.1 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bigdata-demo</artifactId>
<groupId>com.zw</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storm-demo</artifactId>
<packaging>jar</packaging>
<name>storm-demo</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<repository>
<id>twitter4j</id>
<url>http://twitter4j.org/maven2</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>0.9.7</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--
java直接運作時,修改為compile,
maven運作時,使用provided
-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<version>1.3.8</version>
<extensions>true</extensions>
<configuration>
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
注意storm-core依賴的scope
4.2.2 HelloWorldSpout.java
package com.zw.storm.helloworld;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
/**
* Spout起到和外界溝通的作用,他可以從一個資料庫中按照某種規則取資料,也可以從分布式隊列中取任務
* <p>
* 生成一個随機數生成的Tuple
* </p>
*
* Created by zhangws on 16/10/3.
*/
public class HelloWorldSpout extends BaseRichSpout {
// 用來發射資料的工具類
private SpoutOutputCollector collector;
private int referenceRandom;
private static final int MAX_RANDOM = ;
public HelloWorldSpout() {
final Random rand = new Random();
referenceRandom = rand.nextInt(MAX_RANDOM);
}
/**
* 定義字段id,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處。
* <p>
* 該declarer變量有很大作用,我們還可以調用declarer.declareStream();
* 來定義stramId,該id可以用來定義更加複雜的流拓撲結構
* </p>
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
/**
* 初始化collector
*
* @param map
* @param topologyContext
* @param spoutOutputCollector
*/
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
/**
* 每調用一次就可以向storm叢集中發射一條資料(一個tuple元組),該方法會被不停的調用
*/
@Override
public void nextTuple() {
Utils.sleep();
final Random rand = new Random();
int instanceRandom = rand.nextInt(MAX_RANDOM);
if (instanceRandom == referenceRandom) {
collector.emit(new Values("Hello World"));
} else {
collector.emit(new Values("Other Random Word"));
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
4.2.3 HelloWorldBolt.java
package com.zw.storm.helloworld;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
* 接收噴發節點(Spout)發送的資料進行簡單的處理後,發射出去。
* <p>
* 用于讀取已産生的Tuple并實作必要的統計邏輯
* </p>
*
* Created by zhangws on 16/10/4.
*/
public class HelloWorldBolt extends BaseBasicBolt {
private int myCount;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String test = tuple.getStringByField("sentence");
if ("Hello World".equals(test)) {
myCount++;
System.out.println("==========================: " + myCount);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
4.2.4 HelloWorldTopology.java
package com.zw.storm.helloworld;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/**
* mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology
* Created by zhangws on 16/10/4.
*/
public class HelloWorldTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// 設定噴發節點并配置設定并發數,該并發數将會控制該對象在叢集中的線程數。
builder.setSpout("randomHelloWorld", new HelloWorldSpout(), );
// 設定資料處理節點并配置設定并發數。指定該節點接收噴發節點的政策為随機方式。
builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), )
.shuffleGrouping("randomHelloWorld");
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > ) {
config.setNumWorkers();
StormSubmitter.submitTopology(args[], config, builder.createTopology());
} else {
// 這裡是本地模式下運作的啟動代碼。
config.setMaxTaskParallelism();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep();
cluster.killTopology("test");
cluster.shutdown();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
4.2.5 運作
# maven
mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology
# java直接運作
修改storm-core依賴的scope為compile
- 1
- 2
- 3
- 4
- 5
結果
...
[Thread--HelloWorldBolt] INFO backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:, stream: default, id: {}, [Other Random Word]
[Thread--randomHelloWorld] INFO backtype.storm.daemon.task - Emitting: randomHelloWorld default [Hello World]
[Thread--HelloWorldBolt] INFO backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:, stream: default, id: {}, [Hello World]
==========================:
[Thread--randomHelloWorld] INFO backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
[Thread--HelloWorldBolt] INFO backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:, stream: default, id: {}, [Other Random Word]
[Thread--randomHelloWorld] INFO backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
4.3 與Kafka內建
4.3.1 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bigdata-demo</artifactId>
<groupId>com.zw</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka2storm</artifactId>
<packaging>jar</packaging>
<name>kafka2storm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>0.9.7</storm.version>
<kafka.version>0.9.0.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--
java直接運作時,修改為 compile,
maven運作時,使用 provided
-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
4.3.2 MessageScheme.java
package com.zw.kafka.storm;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* 對kafka出來的資料轉換成字元串
* <p>
* KafkaSpout是Storm中自帶的Spout,
* 使用KafkaSpout時需要子集實作Scheme接口,它主要負責從消息流中解析出需要的資料
* </p>
*
* Created by zhangws on 16/10/2.
*/
public class MessageScheme implements Scheme {
public List<Object> deserialize(byte[] bytes) {
try {
String msg = new String(bytes, "UTF-8");
return new Values(msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
public Fields getOutputFields() {
return new Fields("msg");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
4.3.3 SequenceBolt.java
package com.zw.kafka.storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* 把輸出儲存到一個檔案中
* <p>
* 把輸出的消息放到檔案kafkastorm.out中
* </p>
*
* Created by zhangws on 16/10/2.
*/
public class SequenceBolt extends BaseBasicBolt {
/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
* <p>
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*
* @param input
* @param collector
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValue();
System.out.println("==============" + word);
//寫檔案
try {
DataOutputStream out_file = new DataOutputStream(new FileOutputStream("/home/zkpk/kafkastorm.out"));
out_file.writeUTF(word);
out_file.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
collector.emit(new Values(word));
}
/**
* Declare the output schema for all the streams of this topology.
*
* @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
4.3.4 KafkaTopology.java
package com.zw.kafka.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import java.util.HashMap;
import java.util.Map;
/**
* 配置kafka送出topology到storm的代碼
* <p>
* topic1的含義kafka接收生産者過來的資料所需要的topic;
* topic2是KafkaBolt也就是storm中的bolt生成的topic,當然這裡topic2這行配置可以省略,
* 是沒有任何問題的,類似于一個中轉的東西
* </p>
* Created by zhangws on 16/10/2.
*/
public class KafkaTopology {
private static final String BROKER_ZK_LIST = "hsm01:2181,hss01:2181,hss02:2181";
private static final String ZK_PATH = "/kafka/brokers";
public static void main(String[] args) throws Exception {
// 配置Zookeeper位址
BrokerHosts brokerHosts = new ZkHosts(BROKER_ZK_LIST, ZK_PATH);
// 配置Kafka訂閱的Topic,以及zookeeper中資料節點目錄和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "sogolog", "/kafka", "kafka");
// 配置KafkaBolt中的kafka.broker.properties
Config conf = new Config();
Map<String, String> map = new HashMap<String, String>();
// 配置Kafka broker位址
map.put("metadata.broker.list", "hsm01:9092");
// serializer.class為消息的序列化類
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", map);
// 配置KafkaBolt生成的topic
conf.put("topic", "topic2");
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
// spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), );
builder.setBolt("kafka-bolt", new SequenceBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("kafka-bolt2", new KafkaBolt<String, Integer>()).shuffleGrouping("kafka-bolt");
String name = KafkaTopology.class.getSimpleName();
if (args != null && args.length > ) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[]);
conf.setNumWorkers();
StormSubmitter.submitTopology(name, conf, builder.createTopology());
} else {
//本地模式運作
conf.setMaxTaskParallelism();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Utils.sleep();
cluster.killTopology(name);
cluster.shutdown();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
4.3.5 拷貝kafka依賴jar包到storm lib
cp ~/kafka_2.-./libs/kafka_2.-..jar ~/storm-./lib/
cp ~/kafka_2.-./libs/scala-library-..jar ~/storm-./lib/
cp ~/kafka_2.-./libs/metrics-core-..jar ~/storm-./lib/
cp ~/kafka_2.-./libs/log4j-..jar ~/storm-./lib/
# cp ~/kafka_2.11-0.9.0.1/libs/slf4j-api-1.7.6.jar ~/storm-0.9.7/lib/
cp ~/kafka_2.-./libs/jopt-simple-.jar ~/storm-./lib/
- 1
- 2
- 3
- 4
- 5
- 6
4.3.2 運作
啟動ZooKeeper與storm叢集。
啟動kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
- 1
運作kafkatopology
storm jar /home/zkpk/doc/kafka2storm--SNAPSHOT-jar-with-dependencies.jar com.zw.kafka.storm.KafkaTopology hsm01
- 1
建立一個訂閱者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic topic2 --from-beginning
- 1
啟動kafka生産者
kafka-console-producer.sh --broker-list hsm01: --topic sogolog
- 1
結果
# 生産者
[zkpk@hsm01 ~]$ kafka-console-producer.sh --broker-list hsm01: --topic sogolog
nihao
hello storm-kafka
你好,storm-kafka
# 經過storm處理的消費者
[zkpk@hsm01 ~]$ kafka-console-consumer.sh --zookeeper hsm01:/kafka --topic topic2 --from-beginning
nihao
hello storm-kafka
你好,storm-kafka
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
4.4 參考
storm-starter IDE 下的調試經曆
kafka與storm內建測試問題小結
Storm內建Kafka應用的開發
KafkaSpout 引起的 log4j 的問題
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!-- 由于storm環境中有該jar,是以不用pack到最終的task.jar中 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!-- kafka目前已經有2.10了,但是我用了,任務執行報錯,目前隻能用kafka_2.9.2,我kafka服務端也是用最新的2.10版本 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
<!-- 排除以下jar,由于storm服務端有log4j,避免沖突報錯-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j<artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.4</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j<artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>utf-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest><manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</ build >
5. Flume、Kafka與Storm內建測試
# 啟動經過storm處理的訂閱者
kafka-console-consumer.sh --zookeeper hsm01:/kafka --topic topic2
# 運作kafkatopology
storm jar /home/zkpk/doc/kafka2storm--SNAPSHOT-jar-with-dependencies.jar com.zw.kafka.storm.KafkaTopology hsm01
# 啟動flume
flume-ng agent -n a1 -c conf -f /home/zkpk/flume-/conf/kafka-sogolog.properties -Dflume.root.logger=DEBUG,console
# 複制檔案到flume監視目錄
cp test.log flume/
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11