天天看點

ELK日志中心叢集之——kafka簡介與配置ELK日志中心叢集之——kafka簡介和配置

ELK日志中心叢集之——kafka簡介和配置

kafka介紹

kafka是Apache基金會的開源項目,是一個分布式的消息釋出-訂閱系統,他的特性有:

  • 吞吐量高,時延低:kafka每秒可以處理幾十萬條資料,延遲最低隻有幾毫秒。
  • 可擴充性:kafka支援熱擴充。
  • 可靠性:消息被持久化到本地磁盤,支援資料備份。
  • 容錯性:允許叢集中其他節點失敗。
  • 抗高并發:支援數千個用戶端同時讀寫。
    ELK日志中心叢集之——kafka簡介與配置ELK日志中心叢集之——kafka簡介和配置
    kafka元件介紹
  • topic:特定類型的消息流。消息是位元組的有效負載(Payload),topic是消息的分類名,kafka是面向topi的。
  • producer:生産者,産生消息資料的伺服器。
  • broker:儲存已釋出的消息資料的一組伺服器。
  • consumer:消費者,訂閱(檢視,消費)topic,從broker中拉取資料。
  • partition:區,每個topic包含一個或多個partition。
  • consumer group:每個consumer都屬于一個group,但同一條消息隻能被一個每個group中的一個consumer消費,可以同時被多個group消費。
  • replication:partition的副本,保證kafka的高可用。

kafka配置

1.安裝jdk,略。

2.在apache官網擷取kafka包,解壓到/usr/local/

kafka是基于zookeeeper的,是以要先配置zookeeper,zookeeper包已經包含在kafka包中,無需另外下載下傳。

3.重寫zookeeper配置檔案。

[[email protected] ~]# vi /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties 
寫入以下内容:
dataDir=/data/zookeeper/data   #存放資料資訊的目錄
dataLogDir=/data/zookeeper/logs   #存放自身的日志的目錄
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 
server.1=192.168.178.132:2888:3888   #叢集中每一個節點的ip和端口
server.2=192.168.178.133:2888:3888
           

配置檔案解釋:

  • dataDir:ZK資料存放目錄。
  • dataLogDir:ZK日志存放目錄。
  • clientPort:用戶端連接配接ZK服務的端口。
  • tickTime&:ZK伺服器之間或用戶端與伺服器之間維持心跳的時間間隔。
  • initLimit:允許follower(相對于Leaderer言的“用戶端”)連接配接并同步到Leader的初始化連接配接時間,以tickTime為機關。當初始化連接配接時間超過該值,則表示連接配接失敗。
  • syncLimit      Leader與Follower之間發送消息時,請求和應答時間長度。如果follower在設定時間内不能與leader通信,那麼此follower将會被丢棄。
  • server.1=172.16.244.31:2888:3888 2888是follower與leader交換資訊的端口,3888是當leader挂了時用來執行選舉時伺服器互相通信的端口。

4.建立step3中兩個目錄

[[email protected] ~]# mkdir -p /data/zookeeper/data 
[[email protected] ~]# mkdir -p /data/zookeeper/logs
           

5.建立myid檔案以辨別叢集中的每個節點

每個節點myid檔案中的id不能一樣。

[[email protected] ~]# echo 1 > /data/zookeeper/data/myid
           

6.重寫kafka配置檔案

[[email protected] ~]# vi /usr/local/kafka_2.11-2.1.0/config/server.properties
寫入以下内容:
broker.id=1  #kafka不同節點的辨別
listeners=PLAINTEXT://192.168.178.132:9092  #監聽,即本機ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs    #存放kafka日志的目錄,需要自己建立
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.178.131:2181,192.168.178.132:2181  #參與叢集的節點的ip,包括本機。
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
           

配置檔案内容解釋:

  • broker.id:每個server需要單獨配置broker id,如果不配置系統會自動配置。
  • listeners:監聽位址,格式PLAINTEXT://IP:端口。
  • num.network.threads:接收和發送網絡資訊的線程數。
  • num.io.threads:伺服器用于處理請求的線程數,其中可能包括磁盤I/O。
  • socket.send.buffer.bytes:套接字伺服器使用的發送緩沖區(SO_SNDBUF)
  • socket.receive.buffer.bytes:套接字伺服器使用的接收緩沖區(SO_RCVBUF)
  • socket.request.max.bytes:套接字伺服器将接受的請求的最大大小(防止OOM)
  • log.dirs: 日志檔案目錄。
  • num.partitions:partition數量。
  • num.recovery.threads.per.data.dir:在啟動時恢複日志、關閉時刷盤日志每個資料目錄的線程的數量,預設1。
  • offsets.topic.replication.factor:偏移量話題的複制因子(設定更高保證可用),為了保證有效的複制,偏移話題的複制因子是可配置的,在偏移話題的第一次請求的時候可用的broker的數量至少為複制因子的大小,否則要麼話題建立失敗,要麼複制因子取可用broker的數量和配置複制因子的最小值。
  • log.retention.hours:日志檔案删除之前保留的時間(機關小時),預設168
  • log.segment.bytes:單個日志檔案的大小,預設1073741824
  • log.retention.check.interval.ms:檢查日志段以檢視是否可以根據保留政策删除它們的時間間隔。
  • zookeeper.connect:ZK主機位址,如果zookeeper是叢集則以逗号隔開。
  • zookeeper.connection.timeout.ms:連接配接到Zookeeper的逾時時間。

7.建立6中相應的kafka日志存放目錄

[[email protected] ~]# mkdir -p /data/kafka/logs
           

8. 其他的節點做相同的配置

注意/data/zookeeper/data/下的myid檔案中不能重複,

kafka配置檔案中的broker.id不能重複。

9.每個節點依次啟動zookeeper

[[email protected] ~]# cd /usr/local/kafka
[[email protected] kafka]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
           

10.每個節點依次驗證zookeeper

若沒有nc,可以先provides查找,再yum安裝一下。

[[email protected] ~]# echo conf | nc 127.0.0.1 2181
           

看見以下現象說明啟動成功

clientPort=2181
dataDir=/data/zookeeper/data/version-2
dataLogDir=/data/zookeeper/logs/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=1
initLimit=20
syncLimit=10
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
[[email protected] ~]# echo conf | nc 127.0.0.1 2181
           

同樣的再次驗證

[[email protected] ~]# echo stat | nc 127.0.0.1 2181
           

看到以下現象則成功

Latency min/avg/max: 0/0/0
Received: 4
Sent: 3
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: follower
Node count: 4
           

11.每個節點依次啟動kafka

[[email protected] ~]# cd /usr/local/kafka
[[email protected] kafka]# nohup bin/kafka-server-start.sh config/server.properties &
           

12.kafka的使用

  • 建立topic
[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.178.132:2181 --replication-factor 1 --partitions 1 --topic yiki
           

–zookeeper後面是本機的ip和kafka的端口号。

–topic後面是建立的topic名,可以自己定義。

  • 查詢topic
[[email protected] kafka]# bin/kafka-topics.sh --zookeeper 192.168.178.132:2181 --list
           

–zookeeper後面是想要查詢的kafka伺服器上内容的伺服器ip,由于叢集,是以每一個節點上的資料是一樣的。

  • 生産者消費者實時同步

    生産者端

[[email protected] kafka]# bin/kafka-console-producer.sh --broker-list 192.168.178.131:9092 --topic yiki  
>
           

–broker-list後面是想要将資料傳給的對端消費者ip,

–topic後面是釋出的topic話名。

消費者端

[[email protected] kafka]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.178.131:9092 --topic yiki --from-beginning
           

–bootstrap-server後面是消費者自身的ip,要與生産者端一緻,

–topic後面是訂閱的topic名。

繼續閱讀