天天看點

kafka叢集搭建

第一步

先去官網下載下傳 kafka_2.9.2-0.8.1.1.tgz 并解壓再進入到安裝檔案夾(也能夠自己配置路徑,方法跟配置java、hadoop等路徑是一樣的).

> tar -xzf kafka_2.9.2-0.8.1.1.tgz 

> cd kafka_2.9.2-0.8.1.1

第二步

zeekeeper叢集搭建(用的是kafka自帶的zeekeeper,一共準備了三台機器)

1、關閉各台機器的防火牆(一定要切記。我搭建的時候以為能ping通就ok了,就沒關心防火牆的問題了。最後白白浪費了一天的時間)

指令 /ect/init.d/iptables stop

2、進入到打開/ect下的hosts檔案

改動為

127.0.0.1 localhost

10.61.5.66 host1

10.61.5.67 host2

10.61.5.68 host3

(ip和機器名依據個人實際情況改動)

3、改動zeekeeper 配置檔案

進入到kafka安裝檔案夾下的config檔案。打開zookeeper.properties

改動dataDir={kafka安裝檔案夾}/zookeeper/logs/

凝視掉maxClientCnxns=0

在檔案末尾加入例如以下語句

tickTime=2000

initLimit=5

syncLimit=2

#host1、2、3為主機名。能夠依據實際情況更改。port号也能夠更改

server.1=host1:2888:3888

server.2=host2:2888:3888

server.3=host3:2888:3888

4、在dataDir檔案夾下的建立一個myid檔案

指令   echo 1 >myid

另外兩台機子分别設定為2、3,依次類推。

第三步

啟動zookeeper服務(每台機子的zeekeeper都要啟)

> bin/zookeeper-server-start.sh config/zookeeper.properties

在三台機子的zeekeeper都啟動好之前。先啟動的機子會有錯誤日志,這是正常的

第四步

配置kafka

1、在kafka安裝檔案夾下的config檔案夾下打開server.properties檔案

改動

zookeeper.connect=host1:2181,host2:2181,host3:2181    (2181為port号。能夠依據自己的實際情況更改)

其它兩台機子的server.properties檔案裡的broker.id也要改,反正三台機子的broker.id不能有反複

2、改動producer.properties檔案

metadata.broker.list=host1:9092,host2:9092,host3:9092

prodeucer.type=async

3、改動consumer.properties檔案

zeekeeper.connect=host1:2181,host2:2181,host3:2181

4、在每台機子啟動kafka服務

> bin/kafka-server-start.sh config/server.properties

第四步:建立一個主題

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic my-replicated-test

factor大小不能超過broker數

通過下面指令檢視主題

> bin/kafka-topics.sh --list --zookeeper host1:2181 (也能夠是host2:2181等)

my-replicated-test

通過下述指令能夠看到該主題詳情

> bin/kafka-topics.sh --describe --zookeeper host1:2181 --topic my-replicated-test

第五步:發送消息

在host2上建立生産者角色。并發送消息(事實上能夠是三台機子中的不論什麼一台)

> bin/kafka-console-producer.sh --broker-list host1:9092 --topic my-replicated-test 

This is a message

This is another message

在host3上建立消費者角色(在該終端窗體内能夠看到生産者公布這消息)

> bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic my-replicated-test --from-beginning

至此。一個kafka叢集就搭好了,能夠作為kafkaserver了

 測試程式(在win系統上)

切記要去C:\Windows\system32\drivers\etc\hosts作例如以下配置,否則測試程式無法訪問kafkaserver!

記得将kafka安裝檔案夾下libs裡的全部包導入項目裡去

//生産者測試程式

public class ProducerTest {

 public static void main(String[] args) throws FileNotFoundException {  

        Properties props = new Properties();  

        props.put("zookeeper.connect", "slaves7:2182,slaves8:2182,slaves9:2182");  

        props.put("serializer.class", "kafka.serializer.StringEncoder");  

        props.put("metadata.broker.list","slaves7:9092,slaves8:9092,slaves9:9092");

        ProducerConfig config = new ProducerConfig(props);  

        Producer<String, String> producer = new Producer<String, String>(config);  

         File file=new File("E:/test","test.txt");

         BufferedReader readtxt=new BufferedReader(new FileReader(file));

          String line=null;

          byte[] item=null;

   try {

      while((line=readtxt.readLine())!=null){

      item=line.getBytes();

      String str = new String(item);

      System.out.println(str);

      producer.send(new KeyedMessage<String, String>("my-replicated-topic",str));

      }

   } catch (IOException e) {

    e.printStackTrace();

   }

       }  

}

//消費者測試程式

public class ConsumerTest extends Thread {

 private final ConsumerConnector consumer;  

    private final String topic;  

    public static void main(String[] args) {  

        ConsumerTest consumerThread = new ConsumerTest("my-replicated-topic");  

        consumerThread.start();  

    }  

    public ConsumerTest(String topic) {  

     System.out.println(topic);

        consumer = kafka.consumer.Consumer  

                .createJavaConsumerConnector(createConsumerConfig());  

        this.topic = topic;  

    private static ConsumerConfig createConsumerConfig() {  

        Properties props = new Properties();  

        props.put("zookeeper.connect", "slaves7:2182,slaves8,slaves9:2182");  

        props.put("group.id", "0");  

        props.put("zookeeper.session.timeout.ms", "400000");  

        props.put("zookeeper.sync.time.ms", "200");  

        props.put("auto.commit.interval.ms", "1000");  

        return new ConsumerConfig(props);  

    public void run() {  

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  

        topicCountMap.put(topic, new Integer(1));  

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  

                .createMessageStreams(topicCountMap);  

        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  

        ConsumerIterator<byte[], byte[]> it = stream.iterator();  

        while (it.hasNext())  

            System.out.println(new String(it.next().message()));  

當兩個測試程式都執行後,生産者程式會從本機讀取txt檔案的内容,消費者程式會顯示出這些内容