天天看點

Kafka的生産者和消費者代碼解析

1:Kafka名詞解釋和工作方式
    1.1:Producer :消息生産者,就是向kafka broker發消息的用戶端。
    1.2:Consumer :消息消費者,向kafka broker取消息的用戶端
    1.3:Topic :可以了解為一個隊列。
    1.4:Consumer Group (CG):這是kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複制(不是真的複制,是概念上的)到所有的CG,但每個partion隻會把消息發給該CG中的一個consumer。如果需要實作廣播,隻要每個consumer有一個獨立的CG就可以了。要實作單點傳播隻要所有的consumer在同一個CG。用CG還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic。
    1.5:Broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。
    1.6:Partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
    1.7:Offset:kafka的存儲檔案都是按照offset.kafka來命名,用offset做名字的好處是友善查找。例如你想找位于2049的位置,隻要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka。

2:Consumer與topic關系?本質上kafka隻支援Topic。
  2.1:每個group中可以有多個consumer,每個consumer屬于一個consumer group;
通常情況下,一個group中會包含多個consumer,這樣不僅可以提高topic中消息的并發消費能力,而且還能提高"故障容錯"性,如果group中的某個consumer失效那麼其消費的partitions将會有其他consumer自動接管。
  2.2:對于Topic中的一條特定的消息,隻會被訂閱此Topic的每個group中的其中一個consumer消費,此消息不會發送給一個group的多個consumer;
那麼一個group中所有的consumer将會交錯的消費整個Topic,每個group中consumer消息消費互相獨立,我們可以認為一個group是一個"訂閱"者。
  2.3:在kafka中,一個partition中的消息隻會被group中的一個consumer消費(同一時刻);
一個Topic中的每個partions,隻會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以同時消費多個partitions中的消息。
  2.4:kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數的consumer同時消費,否則将意味着某些consumer将無法得到消息。
  2.5:kafka隻能保證一個partition中的消息被某個consumer消費時是順序的;事實上,從Topic角度來說,當有多個partitions時,消息仍不是全局有序的。

3:Kafka消息的分發,Producer用戶端負責消息的分發。
  3.1:kafka叢集中的任何一個broker都可以向producer提供metadata資訊,這些metadata中包含"叢集中存活的servers清單"/"partitions leader清單"等資訊;
  3.2:當producer擷取到metadata資訊之後, producer将會和Topic下所有partition leader保持socket連接配接;
  3.3:消息由producer直接通過socket發送到broker,中間不會經過任何"路由層",事實上,消息被路由到哪個partition上由producer用戶端決定;
      比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實作"消息均衡分發"是必要的。
  3.4:在producer端的配置檔案中,開發者可以指定partition路由的方式。
  3.5:Producer消息發送的應答機制:
    設定發送資料是否需要服務端的回報,三個值0,1,-1。
    0: producer不會等待broker發送ack。 
    1: 當leader接收到消息之後發送ack。
    -1: 當所有的follower都同步消息成功後發送ack。
        request.required.acks=0。

4:Consumer的負載均衡:
  當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提升topic的并發消費能力:      

 步驟如下:

a、假如topic1,具有如下partitions: P0,P1,P2,P3。

b、加入group中,有如下consumer: C1,C2。

c、首先根據partition索引号對partitions排序: P0,P1,P2,P3。

d、根據consumer.id排序: C0,C1。

e、計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)。

f、然後依次配置設定partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]。

Kafka的生産者和消費者代碼解析

6:Kafka檔案存儲基本結構:

  6.1:在Kafka檔案存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序号,第一個partiton序号從0開始,序号最大值為partitions數量減1。

  6.2:每個partion(目錄)相當于一個巨型檔案被平均配置設定到多個大小相等segment(段)資料檔案中。但每個段segment file消息數量不一定相等,這種特性友善old segment file快速被删除。預設保留7天的資料。

  6.3:每個partiton隻需要支援順序讀寫就行了,segment檔案生命周期由服務端配置參數決定。(什麼時候建立,什麼時候删除)。

1:使用Idea進行開發,源碼如下所示,首先加入Kafka必須依賴的包,這句話意味着你必須要先在Idea上面搭建好的你的maven環境:

pom.xml如下所示内容:

1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>com.bie</groupId>
 8     <artifactId>storm</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11     <!-- storm的依賴關系 -->
12     <dependencies>
13         <!--storm依賴的包-->
14         <dependency>
15             <groupId>org.apache.storm</groupId>
16             <artifactId>storm-core</artifactId>
17             <version>0.9.5</version>
18             <!--<scope>provided</scope>-->
19         </dependency>
20         <!-- kafka依賴的包-->
21         <dependency>
22             <groupId>org.apache.kafka</groupId>
23             <artifactId>kafka_2.8.2</artifactId>
24             <version>0.8.1</version>
25             <exclusions>
26                 <exclusion>
27                     <artifactId>jmxtools</artifactId>
28                     <groupId>com.sun.jdmk</groupId>
29                 </exclusion>
30                 <exclusion>
31                     <artifactId>jmxri</artifactId>
32                     <groupId>com.sun.jmx</groupId>
33                 </exclusion>
34                 <exclusion>
35                     <artifactId>jms</artifactId>
36                     <groupId>javax.jms</groupId>
37                 </exclusion>
38                 <exclusion>
39                     <groupId>org.apache.zookeeper</groupId>
40                     <artifactId>zookeeper</artifactId>
41                 </exclusion>
42                 <exclusion>
43                     <groupId>org.slf4j</groupId>
44                     <artifactId>slf4j-log4j12</artifactId>
45                 </exclusion>
46                 <exclusion>
47                     <groupId>org.slf4j</groupId>
48                     <artifactId>slf4j-api</artifactId>
49                 </exclusion>
50             </exclusions>
51         </dependency>
52     </dependencies>
53 
54     <!--如果依賴外部包,就打不進去外部包,是以需要引入下面所示-->
55     <build>
56         <plugins>
57             <plugin>
58                 <!--把其他外部依賴的jar包打成一個大jar包-->
59                 <artifactId>maven-assembly-plugin</artifactId>
60                 <configuration>
61                     <descriptorRefs>
62                         <descriptorRef>jar-with-dependencies</descriptorRef>
63                     </descriptorRefs>
64                     <archive>
65                         <manifest>
66                             <mainClass>com.bie.wordcount.WordCountTopologyMain</mainClass>
67                         </manifest>
68                     </archive>
69                 </configuration>
70                 <executions>
71                     <execution>
72                         <id>make-assembly</id>
73                         <phase>package</phase>
74                         <goals>
75                             <goal>single</goal>
76                         </goals>
77                     </execution>
78                 </executions>
79             </plugin>
80             <plugin>
81                 <groupId>org.apache.maven.plugins</groupId>
82                 <artifactId>maven-compiler-plugin</artifactId>
83                 <configuration>
84                     <source>1.7</source>
85                     <target>1.7</target>
86                 </configuration>
87             </plugin>
88         </plugins>
89     </build>
90 
91 
92 </project>      

然後呢,書寫你的生産者源碼,如下所示:

package com.bie.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

/**
 * 這是一個簡單的Kafka producer代碼
 * 包含兩個功能:
 * 1、資料發送
 * 2、資料按照自定義的partition政策進行發送
 *
 *
 * KafkaSpout的類
 */
public class KafkaProducerSimple {


    public static void main(String[] args) {
        /**
         * 1、指定目前kafka producer生産的資料的目的地
         *  建立topic可以輸入以下指令,在kafka叢集的任一節點進行建立。
         *  bin/kafka-topics.sh --create --zookeeper master:2181
         *  --replication-factor 1 --partitions 1 --topic orderMq
         */
        String TOPIC = "orderMq8";
        /**
         * 2、讀取配置檔案
         */
        Properties props = new Properties();
        /*
         * key.serializer.class預設為serializer.class
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker對應的主機,格式為host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "master:9092,slaver1:9092,slaver2:9092");
        /*
         * request.required.acks,設定發送資料是否需要服務端的回報,有三個值0,1,-1
         * 0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。
         * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server挂掉的時候會丢失一些資料。
         * 1,意味着在leader replica已經接收到資料後,producer會得到一個ack。
         * 這個選項提供了更好的持久性,因為在server确認請求成功處理後,client才會傳回。
         * 如果剛寫到leader上,還沒來得及複制leader就挂了,那麼消息才可能會丢失。
         * -1,意味着在所有的ISR都接收到資料後,producer才得到一個ack。
         * 這個選項提供了最好的持久性,隻要還有一個replica存活,那麼資料就不會丢失
         */
        props.put("request.required.acks", "1");
        /*
         * 可選配置,如果不配置,則使用預設的partitioner partitioner.class
         * 預設值:kafka.producer.DefaultPartitioner
         * 用來把消息分到各個partition中,預設行為是對key進行hash。
         */
        props.put("partitioner.class", "com.bie.kafka.MyLogPartitioner");
        //props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、通過配置檔案,建立生産者
         */
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        /**
         * 4、通過for循環生産資料
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
            String messageStr = new String(messageNo + "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey," +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發" +
                    "用來配合自定義的MyLogPartitioner進行資料分發");

            /**
             * 5、調用producer的send方法發送資料
             * 注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發
             */
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + messageStr));

            //producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "biexiansheng"));
        }
    }
}      

生産者需要的Partitioner如下所示内容:

package com.bie.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;


public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    public int partition(Object obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
//        return 1;
    }

}      

生産者運作效果如下所示:

Kafka的生産者和消費者代碼解析

消費者代碼如下所示:

package com.bie.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerSimple implements Runnable {
    public String title;
    public KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }
    @Override
    public void run() {
        System.out.println("開始運作 " + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        /**
         * 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞
         * 如果調用 `ConsumerConnector#shutdown`,那麼`hasNext`會傳回false
         * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            Object topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            String msg = new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put("group.id", "biexiansheng");
        props.put("zookeeper.connect", "master:2181,slaver1:2181,slaver2:2181");
        props.put("auto.offset.reset", "largest");
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "roundrobin");
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "orderMq8";
        //String topic2 = "paymentMq";
        //隻要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //定義一個map
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic1, 3);
        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 對應的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
        //建立一個容量為4的線程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //建立20個consumer threads
        for (int i = 0; i < streams.size(); i++) {
            executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));
        }
    }
}      

消費者運作如下所示:

運作消費者出現下面的錯誤,解決方法将pomx.ml裡面的zookeeper配置注釋了即可:

Kafka的生産者和消費者代碼解析

 錯誤如下所示:

1 D:\soft\Java\jdk1.7.0_80\bin\java -javaagent:E:\360Downloads\idea\lib\idea_rt.jar=61635:E:\360Downloads\idea\bin -Dfile.encoding=UTF-8 -classpath D:\soft\Java\jdk1.7.0_80\jre\lib\charsets.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\deploy.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\access-bridge-64.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\dnsns.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\jaccess.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\localedata.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunec.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunjce_provider.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunmscapi.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\zipfs.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\javaws.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jce.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jfr.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jfxrt.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jsse.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\management-agent.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\plugin.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\resources.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\rt.jar;E:\360Downloads\idea\storm\target\classes;E:\maven\repository\org\apache\storm\storm-core\0.9.5\storm-core-0.9.5.jar;E:\maven\repository\org\clojure\clojure\1.5.1\clojure-1.5.1.jar;E:\maven\repository\clj-time\clj-time\0.4.1\clj-time-0.4.1.jar;E:\maven\repository\joda-time\joda-time\2.0\joda-time-2.0.jar;E:\maven\repository\compojure\compojure\1.1.3\compojure-1.1.3.jar;E:\maven\repository\org\clojure\core.incubator\0.1.0\core.incubator-0.1.0.jar;E:\maven\repository\org\clojure\tools.macro\0.1.0\tools.macro-0.1.0.jar;E:\maven\repository\clout\clout\1.0.1\clout-1.0.1.jar;E:\maven\repository\ring\ring-core\1.1.5\ring-core-1.1.5.jar;E:\maven\repository\commons-fileupload\commons-fileupload\1.2.1\commons-fileupload-1.2.1.jar;E:\maven\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\maven\repository\hiccup\hiccup\0.3.6\hiccup-0.3.6.jar;E:\maven\repository\ring\ring-devel\0.3.11\ring-devel-0.3.11.jar;E:\maven\repository\clj-stacktrace\clj-stacktrace\0.2.2\clj-stacktrace-0.2.2.jar;E:\maven\repository\ring\ring-jetty-adapter\0.3.11\ring-jetty-adapter-0.3.11.jar;E:\maven\repository\ring\ring-servlet\0.3.11\ring-servlet-0.3.11.jar;E:\maven\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;E:\maven\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\maven\repository\org\clojure\tools.logging\0.2.3\tools.logging-0.2.3.jar;E:\maven\repository\org\clojure\math.numeric-tower\0.0.1\math.numeric-tower-0.0.1.jar;E:\maven\repository\org\clojure\tools.cli\0.2.4\tools.cli-0.2.4.jar;E:\maven\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\maven\repository\org\apache\commons\commons-exec\1.1\commons-exec-1.1.jar;E:\maven\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;E:\maven\repository\com\googlecode\json-simple\json-simple\1.1\json-simple-1.1.jar;E:\maven\repository\com\twitter\carbonite\1.4.0\carbonite-1.4.0.jar;E:\maven\repository\com\esotericsoftware\kryo\kryo\2.21\kryo-2.21.jar;E:\maven\repository\com\esotericsoftware\reflectasm\reflectasm\1.07\reflectasm-1.07-shaded.jar;E:\maven\repository\org\ow2\asm\asm\4.0\asm-4.0.jar;E:\maven\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\maven\repository\org\objenesis\objenesis\1.2\objenesis-1.2.jar;E:\maven\repository\com\twitter\chill-java\0.3.5\chill-java-0.3.5.jar;E:\maven\repository\org\yaml\snakeyaml\1.11\snakeyaml-1.11.jar;E:\maven\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\maven\repository\commons-codec\commons-codec\1.6\commons-codec-1.6.jar;E:\maven\repository\com\googlecode\disruptor\disruptor\2.10.1\disruptor-2.10.1.jar;E:\maven\repository\org\jgrapht\jgrapht-core\0.9.0\jgrapht-core-0.9.0.jar;E:\maven\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;E:\maven\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;E:\maven\repository\org\slf4j\slf4j-api\1.7.5\slf4j-api-1.7.5.jar;E:\maven\repository\org\slf4j\log4j-over-slf4j\1.6.6\log4j-over-slf4j-1.6.6.jar;E:\maven\repository\jline\jline\2.11\jline-2.11.jar;E:\maven\repository\org\apache\kafka\kafka_2.8.2\0.8.1\kafka_2.8.2-0.8.1.jar;E:\maven\repository\org\scala-lang\scala-library\2.8.2\scala-library-2.8.2.jar;E:\maven\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;E:\maven\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;E:\maven\repository\org\xerial\snappy\snappy-java\1.0.5\snappy-java-1.0.5.jar;E:\maven\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;E:\maven\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;E:\maven\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar com.bie.kafka.KafkaConsumerSimple
 2 260  [main] INFO  kafka.utils.VerifiableProperties - Verifying properties
 3 311  [main] INFO  kafka.utils.VerifiableProperties - Property auto.commit.interval.ms is overridden to 1000
 4 311  [main] INFO  kafka.utils.VerifiableProperties - Property auto.offset.reset is overridden to largest
 5 311  [main] INFO  kafka.utils.VerifiableProperties - Property group.id is overridden to biexiansheng
 6 312  [main] WARN  kafka.utils.VerifiableProperties - Property partition.assignment.strategy is not valid
 7 312  [main] INFO  kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to master:2181,slaver1:2181,slaver2:2181
 8 448  [main] INFO  kafka.consumer.ZookeeperConsumerConnector - [biexiansheng_HY-201707051724-1516692275031-bffb9bfb], Connecting to zookeeper instance at master:2181,slaver1:2181,slaver2:2181
 9 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
10     at java.lang.ClassLoader.defineClass1(Native Method)
11     at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
12     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
13     at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
14     at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
15     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
16     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
17     at java.security.AccessController.doPrivileged(Native Method)
18     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
19     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
20     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
21     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
22     at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
23     at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
24     at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
25     at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
26     at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
27     at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
28     at com.bie.kafka.KafkaConsumerSimple.main(KafkaConsumerSimple.java:58)
29 Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
30     at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
31     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
32     at java.security.AccessController.doPrivileged(Native Method)
33     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
34     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
35     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
36     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
37     ... 19 more
38 
39 Process finished with exit code 1