Kafka快速入門+kafka安裝
- KAFKA實際上是一個消息隊列架構
Kafka快速入門
消息系統
- 隊列
- 消息隊列及, 隊列中放的是消息
Kafka簡介
- 由多台kafka的伺服器構成kafka叢集,來對消息進行管理
- 每一台kafka的伺服器又稱之為一個節點(節點在kafka中叫Broker)
- 每一個節點對應一個Broker
- kafka的消息是分主題(topic)來存儲的,每個主題(topic)用來存儲不同的消息。釋出消息要指定主題(topic)來存儲
- 取消息時也要指定主題
- 一個節點可以包含多個主題(Topic)
- 分區是為了分布式存儲
- Message 是放在對應的分區裡面的
- Consumer Group:
- Zookeeper:kafka的消息(,Message)身并不是存在Zookeeper,Zookeeper用于存儲有kafaka的中繼資料資訊(如:kafka叢集有多少個節點(Broker)、主題(Topic)的名稱等),是以kafaka依賴于Zookeeper,需要搭建Zookeeper伺服器
-
什麼是中繼資料
任何檔案系統中的資料分為資料和中繼資料。資料是指普通檔案中的實際資料,而元
資料指用來描述一個檔案的特征的系統資料,諸如通路權限、檔案擁有者以及檔案資料
塊的分布資訊(inode…)等等。在叢集檔案系統中,分布資訊包括檔案在磁盤上的位置以及磁盤在叢集中的位置。使用者需要操作一個檔案必須首先得到它的中繼資料,才能定位到檔案的位置并且得到檔案的内容或相關屬性。
安裝Kafka
Kafka一般安裝在linux伺服器
- 如果沒有安裝zoopkeeper可以用kafka内内置的zoopkeeper,但一般實際項目中都是用自己安裝的zookeeper
- 如果使用内置zoopkeeper執行如下指令:
> bin/zookeeper-server-start.sh config/zookeeper.properties
- 如果自己安裝指向如下步驟:
- zoopkeeper啟動配置
- 在配置檔案中可以看到zoopkeeper資料預設存放的位置
- 可以根據需要修改 == 注意:以下步驟最好進入root 權限,使用指令 su - 進入root ==
- 修改好配置檔案後啟動zookeeper服務,啟動後如果使用JPS看不到啟動的zookeeper線程,或者使用 ./zkServer.sh status提示如下錯入,說明你需要使用root權限
- 用以上方式直接啟動kafka會以前台的方式運作,阻塞其他程序的運作,以下兩種方式可以實作kafka後運作
- 利用kafka提供背景運作方法
- 利用linux自身提供的方法
- 啟動後使用jps 檢視程序,jps是jdk提供的用來檢視所有java程序的指令
- 依然保持路徑在kafka的bin檔案下
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic hello
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello
- 主題建立完成後,生産者就可以往topic放裡釋出消息了
- kafka為了友善測試,提供了一個模拟的生産者和消費者
./kafka-console-producer.sh --broker-list localhost:9092 --topic hello
- 使用FinalShell連接配接伺服器,和producer為同一台伺服器
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning
- 現在生産者釋出消息,訂閱了hello主題的消費者立刻就可以收到,消費者可以有多個
- 檢視data資料存放目錄:
get /brokers/topics/hello/partitions/0/state
- 檢視zookeeper中的内容:get /brokers/topics/hello/partitions/0/state
- 一般不通過登入zookeeper來檢視中繼資料,而是使用kafka提供的一些工具指令
- 在配置檔案中修改添加topic删除功能
#是否可以删除topic,預設為false
delete.topic.enable=true
- 啟動後會給出topic删除功能開啟提示日志
Kafka配置檔案
############################# Server Basics #############################
# broker的id,值為整數,且必須唯一,在一個叢集中不能重複
broker.id=0
############################# Socket Server Settings #############################
# kafka預設監聽的端口為9092
#listeners=PLAINTEXT://:9092
# 處理網絡請求的線程數量,預設為3個
num.network.threads=3
# 執行磁盤IO操作的線程數量,預設為8個
num.io.threads=8
# socket服務發送資料的緩沖區大小,預設100KB
socket.send.buffer.bytes=102400
# socket服務接受資料的緩沖區大小,預設100KB
socket.receive.buffer.bytes=102400
# socket服務所能接受的一個請求的最大大小,預設為100M
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka存儲消息資料的目錄
log.dirs=../data
# 每個topic預設的partition數量
num.partitions=1
# 在啟動時恢複資料和關閉時重新整理資料時每個資料目錄的線程數量
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# 消息重新整理到磁盤中的消息條數門檻值
#log.flush.interval.messages=10000
# 消息重新整理到磁盤中的最大時間間隔
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 日志保留小時數,逾時會自動删除,預設為7天
log.retention.hours=168
# 日志保留大小,超出大小會自動删除,預設為1G
#log.retention.bytes=1073741824
# 日志分片政策,單個日志檔案的大小最大為1G,超出後則建立一個新的日志檔案
log.segment.bytes=1073741824
# 每隔多長時間檢測資料是否達到删除條件
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper連接配接資訊,如果是zookeeper叢集,則以逗号隔開
zookeeper.connect=localhost:2181
# 連接配接zookeeper的逾時時間
zookeeper.connection.timeout.ms=6000
## Kafka叢集
## SpringBoot內建Kafka
# 是否可以删除topic,預設為false
delete.topic.enable=true
kafka叢集搭建
- 如果隻有一台電腦,我們可以模拟搭建叢集伺服器:可以在一台主機上啟動多個zk服務,配置使用不同的端口即可。
1. 搭建zk叢集
-
搭建zk叢集
在一台主機上啟動多個zk服務,配置使用不同的端口
步驟:
1). 拷貝多個zk目錄
zookeeper1、zookeeper2、zookeeper3
2). 分别配置每個zk
- 修改zookeeper配置檔案
- 修改zookeeper配置檔案
vi zookeeper1/conf/zoo.cfg
clientPort=2181
server.1=192.168.2.153:6661:7771
server.2=192.168.2.153:6662:7772
server.3=192.168.2.153:6663:7773
echo 1 > zookeeper1/data/myid
vi zookeeper2/conf/zoo.cfg
clientPort=2182
server.1=192.168.2.153:6661:7771
server.2=192.168.2.153:6662:7772
server.3=192.168.2.153:6663:7773
echo 2 > zookeeper2/data/myid
vi zookeeper3/conf/zoo.cfg
clientPort=2183
server.1=192.168.2.153:6661:7771
server.2=192.168.2.153:6662:7772
server.3=192.168.2.153:6663:7773
echo 3 > zookeeper3/data/myid
3). 啟動zk叢集
2. 搭建Kafka叢集
步驟:
1). 拷貝多個kafka目錄
kafka1、kafka2、kafka3
2). 分别配置每個kafka)
- 複制kafka2和kafka3
vi kafka1/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.2.153:9091
zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
vi kafka2/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.2.153:9092
zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
vi kafka3/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.2.153:9093
zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
3). 啟動kafka叢集
- 進入kafka1的bin目錄啟動kafka1
- 進入kafka2的bin目錄啟動kafka2
- 進入kafka3的bin目錄啟動kafka3 4). 建立Topic
./kafkatopics.sh \
--create\
--zookeeper 192.168.7.40:2181,192.168.7.40:2182,192.168.7.40:2183 \
--replicationfactor 3 \
--partitions 5 \
--topic aaa
5). 生成資料/釋出消息
- 建立主題後就可以生産資料/消費資料了
./kafkaconsoleproducer.sh --brokerlist 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa
-
Ctrl+c 可以退出發不消息
6). 消費資料/訂閱消息
- 另開一個控制台視窗
./kafkaconsoleconsumer.sh --bootstrapserver 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa --from-beginning
- 通過zookeeper檢視用戶端(kafka叢集)的中繼資料,因為zookeeper也是一個叢集是以去任意一個zookeeper伺服器檢視都可以。
- 檢視id為1的broker資訊,檢視id為3的broker的 資訊
- 檢視主題(Topic)的分區
五、SpringBoot內建Kafka
-
-
簡介
SpringBoot提供了一個名為springkafka的starter,用于在Spring項目裡快速內建kafka
-
-
-
用法
步驟:
-
建立SpringBoot項目
勾選Spring Web Starter和Spring for Apache Kafka
-
-
- 項目結構
2. 配置kafka,編輯application.yml檔案
spring:
kafka:
# kafka伺服器位址(可以多個)
bootstrap-servers: 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093
producer:
# 每次批量發送消息的數量
batch-size: 65536
buffer-memory: 524288
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定一個預設的組名
group-id: test
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
-
建立生産者
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate template;
/**
* 發送消息到Kafka
* @param topic 主題,如果主題不存在,會自動建立主題
* @param message 消息
* @return
*/
@RequestMapping("/sendMsg")
public String sendMsg(String topic, String message){
template.send(topic,message);//消息就被發送到kafka伺服器上存起來了
return "success";
}
}
-
-
建立消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-
@Component
public class KafkaConsumer {
/**
* 訂閱指定主題的消息
* @param record 消息記錄
*/
//通過topics指定訂閱的主題
//@KafkaListener(topics = {"aaa","ccc"})
@KafkaListener(topics = {"aaa"})
//加了 @KafkaListener這個注解的方法,可以通過ConsumerRecord擷取訂閱主題的消息
public void listen(ConsumerRecord record){
// System.out.println(record);
System.out.println(record.topic()+":"+record.value());
}
}
-
-
測試
通路http://localhost:8080/sendMsg?topic=aaa&message=welcome
生産者在aaa主題釋出了消息,訂閱了aaa主題的消費者就收到了消息
-
-
在控制中檢視
System.out.println(record);
-
隻看主題和發送的資料
System.out.println(record.topic()+":"+record.value());
-
通路http://localhost:8080/sendMsg?topic=ccc&message=welcome
@KafkaListener(topics = {“aaa”,“ccc”})
如果ccc主題不存在,kafka伺服器會自動建立ccc主題
- 可以在kafka伺服器檢視相關資訊