一. 單機環境
-
準備工作:
1.1. 一台或兩台centos虛拟機,一台給zookeeper(192.168.56.13),一台給kafka(192.168.56.10)。也可以将zookeeper和kafka放在同一台虛拟機上
1.2. jdk 1.8
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
1.3. zookeeper
https://zookeeper.apache.org/releases.html
1.4. kafka
http://kafka.apache.org/downloads
我所有的工具軟體都安裝再/usr/local/soft目錄下
-
安裝jdk
2.1 切換到/usr/local目錄下,建立soft目錄,并将下載下傳的jdk放到/usr/local/soft目錄下解壓
cd /usr/local
mkdir soft
cd soft
tar -zxvf jdk-8u261-linux-x64.tar.gz
配置java環境變量
vi /etc/profile
2.2 在profile檔案末尾添加如下配置,
export JAVA_HOME=/usr/local/soft/jdk1.8.0_261
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}
儲存退出後,執行
source /etc/profile
使修改立即生效
檢視是否配置成功
java -version
如果出現如下内容,則說明配置成功

在添加配置時,千萬要小心不要把PATH 配置錯了,如果出錯,你的幾乎所有指令都會執行不了,曾經我就做了這樣一件傻事,查了老半天才解決,如果你也跟我一樣傻,那就要重新給path指派,path後面的值需要在一台正常的linux上執行
echo $PATH
指令檢視
重新給PATH指派
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
然後重新配置環境變量
-
安裝zookeeper ,zookeeper依賴jdk 是以,要在jdk安裝完成之後再安裝zookeeper
3.1 将zookeeper安裝檔案放到soft目錄下,進行解壓。
這是zookeeper的目錄結構
剛開始安裝的zookeeper在conf目錄下提供了一個配置檔案的模闆,名字為zoo_sample.cfg. 但是zookeeper的預設配置檔案是zoo.cfg 是以這裡要改一下名字,你可以複制一份,再改名字,我這裡是直接改了:linux搭建kafka單機+叢集環境+demo
mv zoo_sample.cfg zoo.cfg
zoo.cfg檔案内容:如果隻是平時學習之類的,配置不用修改,可以直接使用,生産環境需要配置
3.2. 啟動zookeeper。操作在zookeeper根目錄下執行的指令
./bin/zkServer.sh start
關閉zookeeper
./bin/zkServer.sh stop
-
終于到kafka了
4.1 将下載下傳的kafka安裝檔案放到soft目錄下,解壓
4.2 啟動kafka,要修改配置檔案中的zookeeper位址
tar -zvxf kafka_2.11-2.0.0.tgz
vi config/server.properties
以守護程序的方式啟動kafka
sh bin/kafka-server-start.sh -daemon config/server.properties
建立topic
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
在topic下發送消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
hello kafka
消費者消費topic中的消息
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
停止kafka
./bin/kafka-server-stop.sh
到此就完成了kafka的搭建,官方也有很詳細的quickstart 指導
http://kafka.apache.org/quickstart
二. 叢集環境搭建
這是我要搭的叢集環境的
-
準備工作
四台centos7虛拟機,jdk 1.8, zookeeper, kafka.
zookeeper 使用單機環境安裝的那台機器
這裡按照單機環境安裝方式分别給另外三台虛拟機安裝kafka,xshell提供了一種快捷方式,可以同時操作多個視窗
linux搭建kafka單機+叢集環境+demo 這樣子,在一個視窗敲指令,其他視窗也會有響應,就可以同時給三台機器安裝kafka了。
安裝完成之後要修改conf/server.properties檔案,有三處地方需要修改,注意一下,如果你打開了同時操作多個視窗,修改配置檔案的時候一定要關掉,單獨修改。
如果出現broker.id沖突的問題,确認一下,每個kafka的broker.id是否唯一,另外,在/tmp/kafka-logs目錄下有一個meta.properties檔案,其中也有配置broker.id ,要確定這個id和server.properties中的id要一緻
broker.id=2 //這個id要保證唯一,三台機器不能相同
listeners=PLAINTEXT://192.168.56.12:9092 //這個ip要改成目前centos的ip
zookeeper.connect=192.168.56.13:2181 //這裡是zookeeper的ip位址
然後按照之前的方式啟動就ok了
最奉上一個小demo
依賴的元件
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
生産者:SypKafkaProducer.java
package com.syp.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author: SYP
* @Date: 2020/8/19
* @Description:
*/
public class SypKafkaProducer extends Thread{
KafkaProducer<Integer,String> producer;
String topic;
public SypKafkaProducer(String topic){
this.topic = topic;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.10:9092,192.168.56.11:9092,192.168.56.12:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"syp-consumer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartition.class);
producer = new KafkaProducer<Integer, String>(properties);
}
@Override
public void run() {
int num = 0;
String msg = "mymsg"+num;
while(num < 20){
try{
RecordMetadata recordMetadata = producer.send(new ProducerRecord<Integer, String>(topic,msg)).get();
System.out.println(recordMetadata.offset()+"->"+recordMetadata.partition()+"->"+recordMetadata.topic());
TimeUnit.SECONDS.sleep(2);
++num;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new SypKafkaProducer("test").start();
}
}
消費者:SypKafkaConsumer.java
package com.syp.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @Author: SYP
* @Date: 2020/8/19
* @Description:
*/
public class SypKafkaConsumer extends Thread{
KafkaConsumer<Integer,String> consumer;
String topic;
public SypKafkaConsumer(String topic){
this.topic = topic;
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.10:9092,192.168.56.11:9092,192.168.56.12:9092");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "syp-consumer");
//group_id用來解決釋出訂閱模式
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "syp-gid");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void run() {
consumer.subscribe(Collections.singleton(this.topic));
while(true){
ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(record ->
System.out.println(record.key()+"->"+record.value()+"->"+record.offset()));
}
}
public static void main(String[] args) {
new SypKafkaConsumer("test").start();
}
}