本文用的是jstorm 2.2.1
一、pom引用
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jiankunking</groupId>
<artifactId>kafkajstorm</artifactId>
<version>1.0-SNAPSHOT</version>
<url>javascript:void(0)</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<!--<scope>test</scope>-->
</dependency>
<!--jstorm begin-->
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.2.1</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.5.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
<!--jstorm end-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
<!--logback begin-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<!--log end-->
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
二、自定义bolt
package jiankunking.kafkajstorm.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import jiankunking.kafkajstorm.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
/**
* Created by jiankunking on 2017/4/19 16:47.
*/
public class CustomBolt extends BaseBasicBolt
protected final Logger logger = LoggerFactory.getLogger(CustomBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String ss=ByteUtil.getStringFromByteArray((byte[]) ((TupleImplExt) input).get("bytes"));
System.out.println(ss);
logger.info(ss);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("declareOutputFields");
}
}
三、自定义拓扑图入口类
package jiankunking.kafkajstorm.topologies;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.alibaba.jstorm.client.ConfigExtension;
import jiankunking.kafkajstorm.bolts.CustomBolt;
import jiankunking.kafkajstorm.kafka.KafkaSpout;
import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;
import jiankunking.kafkajstorm.util.PropertiesUtil;
import java.util.Map;
/**
* Created by jiankunking on 2017/4/19 16:27.
* 拓扑图 入口类
*/
public class CustomCounterTopology
/**
* 入口类,即提交任务的类
*
* @throws InterruptedException
* @throws AlreadyAliveException
* @throws
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
System.out.println("11111");
PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false);
Map propsMap = propertiesUtil.getAllProperty();
KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps());
spoutConfig.configure(propsMap);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));
builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout");
//Configuration
Config conf = new Config();
conf.setDebug(false);
//指定使用logback.xml
//需要把logback.xml文件放到jstorm conf目录下
ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml");
if (args != null && args.length > 0) {
//提交到集群运行
StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology());
}
}
}
四、配置文件application.properties
# kafka
# kafka 消费组
kafka.client.id=kafkaspoutid
kafka.broker.partitions=4
kafka.fetch.from.beginning=false
kafka.topic=test_one
kafka.broker.hosts=10.10.10.10:9092
kafka.zookeeper.hosts=10.10.10.10:2181
storm.zookeeper.root=/kafka
小注:
1、jstorm kafka插件源码集成
需要到jstorm的github官网:https://github.com/alibaba/jstorm/releases中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图:
2、logback的使用
jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:
a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>
这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。
理论上,这样就能够把log4j桥接到slf4j。
https://github.com/JianKunKing/jstorm-kafka-plugin-demo