
前提
- docker
- docker-compose
其中docker-compose不是必須的,單單使用docker也是可以的,這裡主要介紹docker和docker-compose兩種方式
docker部署
docker部署kafka非常簡單,隻需要兩條指令即可完成kafka伺服器的部署。
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.60(機器IP):9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
由于kafka是需要和zookeeper共同工作的,是以需要部署一個zookeeper,但有了docker這對部署來說非常輕松.
可以通過docker ps檢視到兩個容器的狀态,這裡不再展示.
接下來可以進行生産者和消費者的嘗試
通過kafka自帶工具生産消費消息測試
- 首先,進入到kafka的docker容器中
docker exec -it kafka sh
- 運作消費者,進行消息的監聽
kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9094 --topic kafeidou --from-beginning
- 打開一個新的ssh視窗,同樣進入kafka的容器中,執行下面這條指令生産消息
kafka-console-producer.sh --broker-list 192.168.1.60(機器IP):9092 --topic kafeidou
輸入完這條指令後會進入到控制台,可以輸入任何想發送的消息,這裡發送一個hello
>>
>hello
>
>
>
- 可以看到,在生産者的控制台中輸入消息後,消費者的控制台立刻看到了消息
到目前為止,一個kafka完整的hello world就完成了.kafka的部署加上生産者消費者測試.
通過java代碼進行測試
- 建立一個maven項目并加入以下依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
</dependency>
-
生産者代碼
producer.java
import org.apache.kafka.clients.producer.*;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class HelloWorldProducer {
public static void main(String[] args) {
long events = 30;
Random rnd = new Random();
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.60:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("message.timeout.ms", "3000");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "kafeidou";
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
System.out.println(msg);
ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, ip, msg);
producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
System.out.println("send message done");
producer.close();
System.exit(-1);
}
}
-
消費者代碼
consumer.java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class HelloWorldConsumer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.60:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("auto.offset.reset", "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("kafeidou"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
-
分别運作生産者和消費者即可
生産者列印消息
1581651496176,www.example.com,192.168.2.219
1581651497299,www.example.com,192.168.2.112
1581651497299,www.example.com,192.168.2.20
消費者列印消息
offset = 0, key = 192.168.2.202, value = 1581645295298,www.example.com,192.168.2.202
offset = 1, key = 192.168.2.102, value = 1581645295848,www.example.com,192.168.2.102
offset = 2, key = 192.168.2.63, value = 1581645295848,www.example.com,192.168.2.63
源碼位址:FISHStack/kafka-demo
通過docker-compose部署kafka
首先建立一個docker-compose.yml檔案
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper
volumes:
- ./data:/data
ports:
- 2182:2181
kafka9094:
image: wurstmeister/kafka
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 0
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.60:9092
KAFKA_CREATE_TOPICS: "kafeidou:2:0" #kafka啟動後初始化一個有2個partition(分區)0個副本名叫kafeidou的topic
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- ./kafka-logs:/kafka
depends_on:
- zookeeper
部署起來很簡單,在docker-compose.yml檔案的目錄下執行docker-compose up -d就可以了,測試方式和上面的一樣。
這個docker-compose做的東西比上面docker方式部署的東西要多一些
- 資料持久化,在目前目錄下挂在了兩個目錄分别存儲zookeeper和kafka的資料,當然在docker run 指令中添加 -v 選項也是可以做到這樣的效果的
- kafka在啟動後會初始化一個有分區的topic,同樣的,docker run的時候添加 -e KAFKACREATETOPICS=kafeidou:2:0 也是可以做到的。
總結:優先推薦docker-compose方式部署
為什麼呢?
因為單純使用docker方式部署的話,如果有改動(例如:修改對外開放的端口号)的情況下,docker需要把容器停止docker stop 容器ID/容器NAME,然後删除容器docker rm 容器ID/容器NAME,最後啟動新效果的容器docker run ...
而如果在docker-compose部署的情況下如果修改内容隻需要修改docker-compose.yml檔案對應的地方,例如2181:2181改成2182:2182,然後再次在docker-compose.yml檔案對應的目錄下執行docker-compose up -d就能達到更新後的效果。
始發于 四顆咖啡豆 釋出!
關注公衆号->[四顆咖啡豆] 擷取最新内容