天天看点

jstorm kafka插件使用案例

本文用的是jstorm 2.2.1

一、pom引用

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jiankunking</groupId>
    <artifactId>kafkajstorm</artifactId>
    <version>1.0-SNAPSHOT</version>

    <url>javascript:void(0)</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <!--<scope>test</scope>-->
        </dependency>
        <!--jstorm begin-->
        <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-core</artifactId>
            <version>2.2.1</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>
        <!--jstorm end-->


        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.3</version>
        </dependency>


        <!--logback begin-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>
        <!--log end-->
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>      

二、自定义bolt

package jiankunking.kafkajstorm.bolts;


import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import jiankunking.kafkajstorm.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;




/**
 * Created by jiankunking on 2017/4/19 16:47.
 */
public class CustomBolt extends BaseBasicBolt

    protected final Logger logger = LoggerFactory.getLogger(CustomBolt.class);

    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String ss=ByteUtil.getStringFromByteArray((byte[]) ((TupleImplExt) input).get("bytes"));
            System.out.println(ss);
            logger.info(ss);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("declareOutputFields");
    }
}      

三、自定义拓扑图入口类

package jiankunking.kafkajstorm.topologies;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.alibaba.jstorm.client.ConfigExtension;
import jiankunking.kafkajstorm.bolts.CustomBolt;
import jiankunking.kafkajstorm.kafka.KafkaSpout;
import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;
import jiankunking.kafkajstorm.util.PropertiesUtil;

import java.util.Map;

/**
 * Created by jiankunking on 2017/4/19 16:27.
 * 拓扑图 入口类
 */
public class CustomCounterTopology

    /**
     * 入口类,即提交任务的类
     *
     * @throws InterruptedException
     * @throws AlreadyAliveException
     * @throws
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        System.out.println("11111");
        PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false);
        Map propsMap = propertiesUtil.getAllProperty();
        KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps());
        spoutConfig.configure(propsMap);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout");
        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //指定使用logback.xml
        //需要把logback.xml文件放到jstorm conf目录下
        ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml");
        if (args != null && args.length > 0) {
            //提交到集群运行
            StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            //本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology());
        }
    }
}      

四、配置文件application.properties

# kafka
# kafka 消费组
kafka.client.id=kafkaspoutid
kafka.broker.partitions=4
kafka.fetch.from.beginning=false
kafka.topic=test_one
kafka.broker.hosts=10.10.10.10:9092
kafka.zookeeper.hosts=10.10.10.10:2181

storm.zookeeper.root=/kafka      

小注:

1、jstorm kafka插件源码集成

需要到jstorm的github官网:​​https://github.com/alibaba/jstorm/releases​​中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图:

jstorm kafka插件使用案例

2、logback的使用

jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:

a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.10</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.0.13</version>
</dependency>      

b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
    <scope>provided</scope>
</dependency>      

这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。

理论上,这样就能够把log4j桥接到slf4j。

​​https://github.com/JianKunKing/jstorm-kafka-plugin-demo​​