天天看点

java 集成 kafka 0.8.2.1 适配jdk1.6

文章目录

  • ​​一、版本说明​​
  • ​​二、实战​​
  • ​​2.1. 依赖​​
  • ​​2.2. 生产者代码​​
  • ​​2.3. 消费端代码​​
  • ​​2.4. 测试​​
  • ​​三、小伙伴疑难解答​​
  • ​​3.1. 首先新建一个maven项目​​
  • ​​3.2. 把我的依赖和代码复制过去​​
  • ​​3.3. 把我写的case调试通​​
  • ​​3.4. 找到左边External Libraries​​
  • ​​3.5. jar处理​​
  • ​​3.6. 打开非maven项目,添加jar​​
  • ​​3.7. 等待项目编译​​
  • ​​四、 项目jar和引入的jar冲突建议​​
  • ​​4.1. 定位问题​​
  • ​​4.2. 分析问题​​
  • ​​五、解决问题​​
  • ​​5.1. 解决问题场景​​
  • ​​5.2. 方案1​​
  • ​​5.2. 方案2​​
  • ​​5.2. 方案3​​
一、版本说明
linux服务器环境软件 版本
jdk jdk-8u144-linux-x64.tar.gz
kafka kafka_2.9.2-0.8.2.1.tgz
应用服务器软件 版本
jdk jdk1.6.0_24
kafka kafka_2.9.2-0.8.2.1
二、实战

2.1. 依赖

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.2.1</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.15</version>
      <exclusions>
        <exclusion>
          <artifactId>jmxtools</artifactId>
          <groupId>com.sun.jdmk</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jmxri</artifactId>
          <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jms</artifactId>
          <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
          <artifactId>mail</artifactId>
          <groupId>javax.mail</groupId>
        </exclusion>
      </exclusions>
    </dependency>      

2.2. 生产者代码

package com.sinosoft.d;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * kafka生产端
 *
 * @author gblfy
 * @date 2020-08-07
 *
 * Kafka生产者测试
 * http://kafka.apache.org/documentation.html#introduction

 */
public class KafkaProducetest {

    private final Producer<String, String> producer;
    public final static String TOPIC = "clicki_info_topic";

    private KafkaProducetest() {
        Properties props = new Properties();
        //此处配置的是kafka的端口
        props.put("metadata.broker.list", "10.5.6.19:9092");

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //0表示不确认主服务器是否收到消息,马上返回,低延迟但最弱的持久性,数据可能会丢失
        //1表示确认主服务器收到消息后才返回,持久性稍强,可是如果主服务器死掉,从服务器数据尚未同步,数据可能会丢失
        //-1表示确认所有服务器都收到数据,完美!
        props.put("request.required.acks", "-1");

        //异步生产,批量存入缓存后再发到服务器去
        props.put("producer.type", "async");

        //填充配置,初始化生产者
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1000;
        final int COUNT = 2000;

        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";
            // 发送消息
            //            producer.send(new KeyedMessage<String, String>(TOPIC,data1));
            // 消息类型key:value
            producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
            System.out.println(data);
            messageNo++;
        }
        producer.close();//必须关闭
    }

    public static void main(String[] args) {
        new KafkaProducetest().produce();

    }
}      

2.3. 消费端代码

package com.sinosoft.d;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

/**
 * kafka生产端
 *
 * @author gblfy
 * @date 2020-08-07
 * <p>
 * Kafka消费者测试
 */
public class KafkaConsumertest {

    private final ConsumerConnector consumer;

    private KafkaConsumertest() {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", "10.5.6.19:2181");

        //group 代表一个消费组,加入组里面,消息只能被该组的一个消费者消费
        //如果所有消费者在一个组内,就是传统的队列模式,排队拿消息
        //如果所有的消费者都不在同一个组内,就是发布-订阅模式,消息广播给所有组
        //如果介于两者之间,那么广播的消息在组内也是要排队的
        props.put("group.id", "jd-group");

        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
        props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间
        props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率
        /*
         * 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.
         * largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
         * */
        props.put("auto.offset.reset", "smallest");  //消费最老消息,最新为largest
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        // 描述读取哪个topic,需要几个线程读
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));


        /* 默认消费时的数据是byte[]形式的,可以传入String编码器*/
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        //消费数据时每个Topic有多个线程在读,所以取List第一个流
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());
        }
    }

    public static void main(String[] args) {
        new KafkaConsumertest().consume();
    }
}      

2.4. 测试

先启动消费端,在启动生产端

java 集成 kafka 0.8.2.1 适配jdk1.6
java 集成 kafka 0.8.2.1 适配jdk1.6
三、小伙伴疑难解答

有的小伙伴问我,我们的工程师非maven项目该怎么办呢?

我给这个小伙伴的建议是以下几点:

3.1. 首先新建一个maven项目

3.2. 把我的依赖和代码复制过去

3.3. 把我写的case调试通

3.4. 找到左边External Libraries

java 集成 kafka 0.8.2.1 适配jdk1.6

3.5. jar处理

在本地的maven仓库中把这些以来的jar复制出来,建议先发到一个空的文件夹里面,建议和我一样

java 集成 kafka 0.8.2.1 适配jdk1.6

3.6. 打开非maven项目,添加jar

打开你的非maven项目,把这些jar添加进去

java 集成 kafka 0.8.2.1 适配jdk1.6
java 集成 kafka 0.8.2.1 适配jdk1.6

3.7. 等待项目编译

四、 项目jar和引入的jar冲突建议

关于这个问题,很多小伙伴应该也遇到过很正常,首先要保证引入的jar不能和以前的jar产生冲突?

那又该怎么办?

有的小伙伴说,把以前和本次引入jar冲突的jar删除呗!要三思

4.1. 定位问题

首先定位引入的jar和项目中的哪一个jar发生冲突

4.2. 分析问题

冲突的原因是什么?

1.引用的对象的包路径一样并且对象名也一样

2.二个冲突的kar项目中加入都存在,但是,代码中引入jar的优先级问题,引入的jar非自己需要的jar,而自己需要的jar默认不会引入,导致代码报错

3.版本问题,jar向下不兼容

五、解决问题

5.1. 解决问题场景

首先解决问题要基于场景:

我引入的jar,只有我自己用,但是,我的代码中默认引入的jar非我需要的,但是,把项目中以前的jar(冲突的jar)删除,代码就不报错。

5.2. 方案1

这种场景有3种解决方案

方案1(风险可控):如果,可以确保删除以前的jar不会项目的其他功能造成影响,可以考虑删除以前旧的jar

5.2. 方案2

5.2. 方案3