Spring Cloud Bus除了支援RabbitMQ的自動化配置之外,還支援現在被廣泛應用的Kafka。在本文中,我們将搭建一個Kafka的本地環境,并通過它來嘗試使用Spring Cloud Bus對Kafka的支援,實作消息總線的功能。由于本文會以之前Rabbit的實作作為基礎來修改,是以先閱讀《Spring Cloud建構微服務架構(七)消息總線》有助于了解本文。
Kafka簡介
Kafka是一個由LinkedIn開發的分布式消息系統,它于2011年初開源,現在由著名的Apache基金會維護與開發。Kafka使用Scala實作,被用作LinkedIn的活動流和營運資料處理的管道,現在也被諸多網際網路企業廣泛地用作為資料流管道和消息系統。
Kafka是基于消息釋出/訂閱模式實作的消息系統,其主要設計目标如下:
- 消息持久化:以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。
- 高吞吐:在廉價的商用機器上也能支援單機每秒100K條以上的吞吐量
- 分布式:支援消息分區以及分布式消費,并保證分區内的消息順序
- 跨平台:支援不同技術平台的用戶端(如:Java、PHP、Python等)
- 實時性:支援實時資料處理和離線資料處理
- 伸縮性:支援水準擴充
Kafka中涉及的一些基本概念:
- Broker:Kafka叢集包含一個或多個伺服器,這些伺服器被稱為Broker。
- Topic:邏輯上同Rabbit的Queue隊列相似,每條釋出到Kafka叢集的消息都必須有一個Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個Broker上,但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)
- Partition:Partition是實體概念上的分區,為了提供系統吞吐率,在實體上每個Topic會分成一個或多個Partition,每個Partition對應一個檔案夾(存儲對應分區的消息内容和索引檔案)。
- Producer:消息生産者,負責生産消息并發送到Kafka Broker。
- Consumer:消息消費者,向Kafka Broker讀取消息并處理的用戶端。
- Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組,若不指定則屬于預設組),組可以用來實作一條消息被組内多個成員消費等功能。
快速入門
在對Kafka有了一些基本了解之後,下面我們來嘗試建構一個Kafka服務端,并體驗一下基于Kafka的消息生産與消費。
環境安裝
首先,我們需要從官網上下載下傳安裝媒體。下載下傳位址為:
http://kafka.apache.org/downloads.html
。本例中采用的版本為:Kafka-0.10.0.1
在解壓Kafka的安裝包之後,可以看到其目錄結構如下:
kafka
+-bin
+-windows
+-config
+-libs
+-logs
+-site-docs
由于Kafka的設計中依賴了ZooKeeper,是以我們可以在
bin
和
config
目錄中除了看到Kafka相關的内容之外,還有ZooKeeper相關的内容。其中
bin
目錄存放了Kafka和ZooKeeper的指令行工具,
bin
根目錄下是适用于Linux/Unix的shell,而
bin/windows
下的則是适用于windows下的bat。我們可以根據實際的系統來設定環境變量,以友善後續的使用和操作。而在
config
目錄中,則是用來存放了關于Kafka與ZooKeeper的配置資訊。
啟動測試
下面我們來嘗試啟動ZooKeeper和Kafka來進行消息的生産和消費。示例中所有的指令均已配置了Kafka的環境變量為例。
- 啟動ZooKeeper,執行指令:
,該指令需要指定zookeeper的配置檔案位置才能正确啟動,kafka的壓縮包中包含了其預設配置,開發與測試環境不需要修改。zookeeper-server-start config/zookeeper.properties
[ ::,] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[ ::,] INFO autopurge.snapRetainCount set to (org.apache.zookeeper.server.DatadirCleanupManager)
[ ::,] INFO autopurge.purgeInterval set to (org.apache.zookeeper.server.DatadirCleanupManager)
[ ::,] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[ ::,] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[ ::,] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[ ::,] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
...
[ ::,] INFO binding to port /: (org.apache.zookeeper.server.NIOServerCnxnFactory)
從控制台資訊中,我們可以看到ZooKeeper從指定的
config/zookeeper.properties
配置檔案中讀取資訊并綁定2181端口啟動服務。有時候啟動失敗,可檢視一下端口是否被占用,可以殺掉占用程序或通過修改
config/zookeeper.properties
配置檔案中的
clientPort
内容以綁定其他端口号來啟動ZooKeeper。
- 啟動Kafka,執行指令:
,該指令也需要指定Kafka配置檔案的正确位置,如上指令中指向了解壓目錄包含的預設配置。若在測試時,使用外部集中環境的ZooKeeper的話,我們可以在該配置檔案中通過kafka-server-start config/server.properties
參數來設定ZooKeeper的位址和端口,它預設會連接配接本地2181端口的ZooKeeper;如果需要設定多個ZooKeeper節點,可以為這個參數配置多個ZooKeeper位址,并用逗号分割。比如:zookeeper.connect
。zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
- 建立Topic,執行指令:
,通過該指令,建立一個名為“test”的Topic,該Topic包含一個分區一個Replica。在建立完成後,可以使用kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
指令來檢視目前的Topic。kafka-topics --list --zookeeper localhost:2181
另外,如果我們不使用
kafka-topics
指令來手工建立,直接進行下面的内容進行消息建立時也會自動建立Topics來使用。
- 建立消息生産者,執行指令:
。kafka-console-producer --broker-list localhost:9092 --topic test
指令可以啟動Kafka基于指令行的消息生産用戶端,啟動後可以直接在控制台中輸入消息來發送,控制台中的每一行資料都會被視為一條消息來發送。我們可以嘗試輸入幾行消息,由于此時并沒有消費者,是以這些輸入的消息都會被阻塞在名為test的Topics中,直到有消費者将其消費掉位置。kafka-console-producer
- 建立消息消費者,執行指令:
。kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning
指令啟動的是Kafka基于指令行的消息消費用戶端,在啟動之後,我們馬上可以在控制台中看到輸出了之前我們在消息生産用戶端中發送的消息。我們可以再次打開之前的消息生産用戶端來發送消息,并觀察消費者這邊對消息的輸出來體驗Kafka對消息的基礎處理。kafka-console-consumer
整合Spring Cloud Bus
在上一篇使用Rabbit實作消息總線的案例中,我們已經通過引入
spring-cloud-starter-bus-amqp
子產品,完成了使用RabbitMQ來實作的消息總線。若我們要使用Kafka來實作消息總線時,隻需要把
spring-cloud-starter-bus-amqp
替換成
spring-cloud-starter-bus-kafka
子產品,在
pom.xml
的dependenies節點中進行修改,具體如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
如果我們在啟動Kafka時均采用了預設配置,那麼我們不需要再做任何其他配置就能在本地實作從RabbitMQ到Kafka的切換。我們可以嘗試把剛剛搭建的ZooKeeper、Kafka啟動起來,并将修改為
spring-cloud-starter-bus-kafka
子產品的config-server和config-client啟動起來。
在config-server啟動時,我們可以在控制台中看到如下輸出:
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus
-09- :: INFO --- [-localhost:] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread.
...
-09- :: INFO --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:, linger.ms=, acks=, compression.type=none, batch.size=}
-09- :: INFO --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
...
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
-09- :: INFO --- [ main] o.s.integration.channel.DirectChannel : Channel 'config-server:7001.springCloudBusOutput' has subscriber(s).
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
...
-09- :: INFO --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]4178cb34
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.b9e6c7b-a5-c5-b981-a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.b9e6c7b-a5-c5-b981-a0d5a30b
從控制台的輸出内容,我們可以看到config-server連接配接到了Kafka中,并使用了名為
springCloudBus
的Topic。
此時,我們可以使用
kafka-topics --list --zookeeper localhost:2181
指令來檢視目前Kafka中的Topic,若已成功啟動了config-server并配置正确,我們就可以在Kafka中看到已經多了一個名為
springCloudBus
的Topic。
我們再啟動配置了
spring-cloud-starter-bus-kafka
子產品的config-client,可以看到控制台中輸出如下内容:
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus
-09- :: INFO --- [-localhost:] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread.
...
-09- :: INFO --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:, linger.ms=, acks=, compression.type=none, batch.size=}
-09- :: INFO --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
...
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
-09- :: INFO --- [ main] o.s.integration.channel.DirectChannel : Channel 'didispace:7002.springCloudBusOutput' has subscriber(s).
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
...
-09- :: INFO --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]60cf855e
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-dd--98f4ee216} as a subscriber to the 'bridge.springCloudBus' channel
-09- :: INFO --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-dd--98f4ee216
可以看到,config-client啟動時輸出了類似的内容,他們都訂閱了名為
springCloudBus
的Topic。
在啟動了config-server和config-client之後,為了更明顯地觀察消息總線重新整理配置的效果,我們可以在本地啟動多個不同端口的config-client。此時,我們的config-server以及多個config-client都已經連接配接到了由Kafka實作的消息總線上。我們可以先通路各個config-client上的
/from
請求,檢視他擷取到的配置内容。然後,修改Git中對應的參數内容,再通路各個config-client上的
/from
請求,可以看到配置内容并沒有改變。最後,我們向config-server發送POST請求:
/bus/refresh
,此時我們再去通路各個config-client上的
/from
請求,就能獲得到最新的配置資訊,各用戶端上的配置都已經加載為最新的Git配置内容。
從config-client的控制台中,我們可以看到如下内容:
2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from]
RefreshListener
監聽類記錄了收到遠端重新整理請求,并重新整理了
from
屬性的日志。
Kafka配置
在上面的例子中,由于Kafka、ZooKeeper均運作于本地,是以我們沒有在測試程式中通過配置資訊來指定Kafka和ZooKeeper的配置資訊,就完成了本地消息總線的試驗。但是我們實際應用中,Kafka和ZooKeeper一般都會獨立部署,是以在應用中都需要來為Kafka和ZooKeeper配置一些連接配接資訊等。Kafka的整合與RabbitMQ不同,在Spring Boot 1.3.7中并沒有直接提供的Starter子產品,而是采用了Spring Cloud Stream的Kafka子產品,是以對于Kafka的配置均采用了
spring.cloud.stream.kafka
的字首,比如:
屬性名 | 說明 | 預設值 |
---|---|---|
spring.cloud.stream.kafka.binder.brokers | Kafka的服務端清單 | localhost |
spring.cloud.stream.kafka.binder.defaultBrokerPort | Kafka服務端的預設端口,當 屬性中沒有配置端口資訊時,就會使用這個預設端口 | 9092 |
spring.cloud.stream.kafka.binder.zkNodes | Kafka服務端連接配接的ZooKeeper節點清單 | localhost |
spring.cloud.stream.kafka.binder.defaultZkPort | ZooKeeper節點的預設端口,當 屬性中沒有配置端口資訊時,就會使用這個預設端口 | 2181 |
更多配置參數請參考官方文檔
本文完整示例:
- 開源中國:http://git.oschina.net/didispace/SpringCloud-Learning/tree/master/Chapter1-1-7
- GitHub:https://github.com/dyc87112/SpringCloud-Learning/tree/master/Chapter1-1-7
作者:程式猿DD
連結:https://www.jianshu.com/p/730d86030a41
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。