天天看點

linux搭建kafka單機+叢集環境+demo

一. 單機環境

  1. 準備工作:

    1.1. 一台或兩台centos虛拟機,一台給zookeeper(192.168.56.13),一台給kafka(192.168.56.10)。也可以将zookeeper和kafka放在同一台虛拟機上

    1.2. jdk 1.8

    https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

    1.3. zookeeper

    https://zookeeper.apache.org/releases.html

    1.4. kafka

    http://kafka.apache.org/downloads

我所有的工具軟體都安裝再/usr/local/soft目錄下

  1. 安裝jdk

    2.1 切換到/usr/local目錄下,建立soft目錄,并将下載下傳的jdk放到/usr/local/soft目錄下解壓

cd /usr/local
mkdir soft
cd soft
tar -zxvf jdk-8u261-linux-x64.tar.gz
配置java環境變量
vi /etc/profile
           

2.2 在profile檔案末尾添加如下配置,

export JAVA_HOME=/usr/local/soft/jdk1.8.0_261
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}

           

儲存退出後,執行

source /etc/profile

使修改立即生效

檢視是否配置成功

java -version

如果出現如下内容,則說明配置成功

linux搭建kafka單機+叢集環境+demo

在添加配置時,千萬要小心不要把PATH 配置錯了,如果出錯,你的幾乎所有指令都會執行不了,曾經我就做了這樣一件傻事,查了老半天才解決,如果你也跟我一樣傻,那就要重新給path指派,path後面的值需要在一台正常的linux上執行

echo $PATH

指令檢視

重新給PATH指派
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
           

然後重新配置環境變量

  1. 安裝zookeeper ,zookeeper依賴jdk 是以,要在jdk安裝完成之後再安裝zookeeper

    3.1 将zookeeper安裝檔案放到soft目錄下,進行解壓。

    這是zookeeper的目錄結構

    linux搭建kafka單機+叢集環境+demo
    剛開始安裝的zookeeper在conf目錄下提供了一個配置檔案的模闆,名字為zoo_sample.cfg. 但是zookeeper的預設配置檔案是zoo.cfg 是以這裡要改一下名字,你可以複制一份,再改名字,我這裡是直接改了:
mv  zoo_sample.cfg  zoo.cfg
           
linux搭建kafka單機+叢集環境+demo

zoo.cfg檔案内容:如果隻是平時學習之類的,配置不用修改,可以直接使用,生産環境需要配置

linux搭建kafka單機+叢集環境+demo

3.2. 啟動zookeeper。操作在zookeeper根目錄下執行的指令

./bin/zkServer.sh start
           

關閉zookeeper

./bin/zkServer.sh stop
           
  1. 終于到kafka了

    4.1 将下載下傳的kafka安裝檔案放到soft目錄下,解壓

    4.2 啟動kafka,要修改配置檔案中的zookeeper位址

tar -zvxf kafka_2.11-2.0.0.tgz 
           
vi config/server.properties
           
linux搭建kafka單機+叢集環境+demo

以守護程序的方式啟動kafka

sh bin/kafka-server-start.sh -daemon config/server.properties 
           

建立topic

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
           

在topic下發送消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
hello kafka
           

消費者消費topic中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
           

停止kafka

./bin/kafka-server-stop.sh 
           

到此就完成了kafka的搭建,官方也有很詳細的quickstart 指導

http://kafka.apache.org/quickstart

二. 叢集環境搭建

這是我要搭的叢集環境的

linux搭建kafka單機+叢集環境+demo
  1. 準備工作

    四台centos7虛拟機,jdk 1.8, zookeeper, kafka.

    zookeeper 使用單機環境安裝的那台機器

    這裡按照單機環境安裝方式分别給另外三台虛拟機安裝kafka,xshell提供了一種快捷方式,可以同時操作多個視窗

    linux搭建kafka單機+叢集環境+demo

    這樣子,在一個視窗敲指令,其他視窗也會有響應,就可以同時給三台機器安裝kafka了。

    安裝完成之後要修改conf/server.properties檔案,有三處地方需要修改,注意一下,如果你打開了同時操作多個視窗,修改配置檔案的時候一定要關掉,單獨修改。

如果出現broker.id沖突的問題,确認一下,每個kafka的broker.id是否唯一,另外,在/tmp/kafka-logs目錄下有一個meta.properties檔案,其中也有配置broker.id ,要確定這個id和server.properties中的id要一緻

broker.id=2  //這個id要保證唯一,三台機器不能相同
listeners=PLAINTEXT://192.168.56.12:9092  //這個ip要改成目前centos的ip
zookeeper.connect=192.168.56.13:2181 //這裡是zookeeper的ip位址
           

然後按照之前的方式啟動就ok了

最奉上一個小demo

依賴的元件

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
           

生産者:SypKafkaProducer.java

package com.syp.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @Author: SYP
 * @Date: 2020/8/19
 * @Description:
 */
public class SypKafkaProducer extends Thread{

    KafkaProducer<Integer,String> producer;
    String topic;

    public SypKafkaProducer(String topic){
        this.topic = topic;
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.10:9092,192.168.56.11:9092,192.168.56.12:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"syp-consumer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartition.class);

        producer = new KafkaProducer<Integer, String>(properties);
    }

    @Override
    public void run() {
        int num = 0;
        String msg = "mymsg"+num;
        while(num < 20){
            try{
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<Integer, String>(topic,msg)).get();
                System.out.println(recordMetadata.offset()+"->"+recordMetadata.partition()+"->"+recordMetadata.topic());
                TimeUnit.SECONDS.sleep(2);
                ++num;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new SypKafkaProducer("test").start();
    }


}

           

消費者:SypKafkaConsumer.java

package com.syp.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @Author: SYP
 * @Date: 2020/8/19
 * @Description:
 */
public class SypKafkaConsumer extends Thread{

    KafkaConsumer<Integer,String> consumer;
    String topic;

    public SypKafkaConsumer(String topic){
        this.topic = topic;
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.10:9092,192.168.56.11:9092,192.168.56.12:9092");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "syp-consumer");
        //group_id用來解決釋出訂閱模式
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "syp-gid");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void run() {
        consumer.subscribe(Collections.singleton(this.topic));
        while(true){
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(record ->
                    System.out.println(record.key()+"->"+record.value()+"->"+record.offset()));
        }
    }

    public static void main(String[] args) {
        new SypKafkaConsumer("test").start();
    }

}