安裝JDK1.8
1、搜尋jdk安裝包
yum search java|grep jdk
複制
2、下載下傳jdk1.8,下載下傳之後預設的目錄為: /usr/lib/jvm/
yum install java-1.8.0-openjdk
複制
安裝zookeeper
安裝zookeeper
kafka依賴zookeeper,是以需要下載下傳安裝zookeeper
# 下載下傳壓縮包
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解壓
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
複制
修改配置檔案
cd apache-zookeeper-3.7.0-bin/conf/
mv zoo_sample.cfg zoo.cfg
複制
啟動zookeeper
cd ../bin/
./zkServer.sh start
複制
出現以下資訊表示啟動成功
[root@localhost apache-zookeeper-3.7.0-bin]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
複制
啟動異常
如果出現
already running as process
錯誤,這個一般是因為機器異常關閉緩存目錄中殘留PID檔案導緻的(為關閉程序強行關機等導緻的)
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwIjNx8CX39CXy8CXycXZpZVZnFWbp9zZuBnLxYTO1YWNyIzNkN2MxY2NmN2MxYWMlFDN3kDZjNmYiF2LcZTOxMzNxkzLcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.png)
解決方案:到配置檔案
conf/zoo.cfg
查找
dataDir
配置的目錄
dataDir=/tmp/zookeeper
複制
到
dataDir
目錄下,清理緩存檔案
cd /tmp/zookeeper
rm -rf zookeeper_server.pid
複制
安裝kafka
下載下傳并解壓
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -zxvf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1
複制
修改配置
config/server.properties
中
listeners
配置項
# 預設為:
#listeners=PLAINTEXT://:9092
# 修改為:
listeners=PLAINTEXT://192.168.10.232:9092
複制
這裡需要修改監聽位址,否則無法在另外的主機中連接配接kafka
修改後,監聽位址需改為:
,否則會出現如下錯誤:
IP位址:端口
[2022-08-05 10:40:56,361] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-08-05 10:40:56,362] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
複制
啟動kafka
bin/kafka-server-start.sh config/server.properties
複制
建立主題
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --bootstrap-server 192.168.10.232:9092
複制
生産者發送消息
bin/kafka-console-producer.sh --topic topic1 --bootstrap-server 192.168.10.232:9092
複制
消費者接收消息
bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server 192.168.10.232:9092
複制
golang中使用kafka
安裝golang用戶端
go get github.com/Shopify/sarama
go get github.com/bsm/sarama-cluster
複制
使用golang建立同步消息生産者
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"time"
)
var address = []string{"192.168.10.232:9092"}
func main() {
// 配置
config := sarama.NewConfig()
// 設定屬性
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("new sync producer error: %s \n", err.Error())
return
}
// 關閉生産者
defer producer.Close()
// 循環發送消息
for i := 0; i < 10; i++ {
// 建立消息
value := fmt.Sprintf("sync message, index = %d", i)
msg := &sarama.ProducerMessage{
Topic: "topic1", // 主題名稱
Value: sarama.ByteEncoder(value), // 消息内容
}
// 發送消息
part, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("send message error: %s \n", err.Error())
} else {
fmt.Printf("SUCCESS: value=%s, partition=%d, offset=%d \n", value, part, offset)
}
// 每隔兩秒發送一條消息
time.Sleep(2 * time.Second)
}
}
複制
使用golang建立異步消息生産者
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"time"
)
var address = []string{"192.168.10.232:9092"}
func main() {
// 配置
config := sarama.NewConfig()
// 等待伺服器所有副本都儲存成功後的響應
config.Producer.RequiredAcks = sarama.WaitForAll
// 随機向partition發送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失敗後的響應,隻有上面的RequireAcks設定不是NoReponse這裡才有用
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// 設定使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap沒有作用,需要消費和生産同時配置
// 注意,版本設定不對的話,kafka會傳回很奇怪的錯誤,并且無法成功發送消息
config.Version = sarama.V0_10_0_1
fmt.Println("start make producer")
//使用配置,建立一個異步生産者
producer, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("new async producer error: %s \n", err.Error())
return
}
defer producer.AsyncClose()
// 循環判斷哪個通道發送過來資料
fmt.Println("start goroutine")
go func(p sarama.AsyncProducer) {
for {
select {
case suc := <-p.Successes():
fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-p.Errors():
fmt.Println("error: ", fail.Error())
}
}
}(producer)
var value string
for i := 0; ; i++ {
// 每隔兩秒發送一條消息
time.Sleep(2 * time.Second)
// 建立消息
value = fmt.Sprintf("async message, index = %d", i)
// 注意:這裡的msg必須得是新建構的變量,不然你會發現發送過去的消息内容都是一樣的,因為批次發送消息的關系
msg := &sarama.ProducerMessage{
Topic: "topic1",
Value: sarama.ByteEncoder(value),
}
// 使用通道發送
producer.Input() <- msg
}
}
複制
使用golang建立消息消費者
package main
import (
"fmt"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// 配置
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = -2
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Group.Return.Notifications = true
// 建立消費者
brokers := []string{"192.168.10.232:9092"}
topics := []string{"topic1"}
consumer, err := cluster.NewConsumer(brokers, "consumer-group", topics, config)
if err != nil {
fmt.Printf("new consumer error: %s\n", err.Error())
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for err := range consumer.Errors() {
fmt.Printf("consumer error: %s", err.Error())
}
}()
go func() {
for ntf := range consumer.Notifications() {
fmt.Printf("consumer notification error: %v \n", ntf)
}
}()
// 循環從通道中擷取消息
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Printf("%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // 上報offset
} else {
fmt.Println("監聽服務失敗")
}
case <-signals:
return
}
}
}
複制
連結
DEMO:https://github.com/cqcqs/go-kafka-demo