源碼:Chapter 8 Event-driven architecture with Spring Cloud Stream
要實作的功能是:現有兩個微服務,組織服務A,和許可證服務B,B用redis儲存A的緩存,B産生變化的時候,用消息隊列通知A資料已經變化,請重新整理緩存。
一、編寫簡單的消息生産者和消費者
1.在組織服務裡編寫消息生産者
組織服務pom依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
這個就是告訴Spring Cloud Stream将服務綁定到消息代理,該服務将通過Source類上定義的一組通道與消息代理進行通信。Spring Cloud Steam上有一個預設的通道集,可以配置它們來與消息代理進行通信。但是現在還沒說它要綁到哪個消息代理。
到這裡還沒提到要用哪個消息中間件,接下來就把它綁到kafka吧~
在這裡簡單貼一下Zookeeper和kafka的基本知識(學習的時候,可以先黑箱再白箱):簡單了解一下kafka的叢集結構。然後試着把它搭建出來,Docker比較友善,首先保證有Java環境。
然後,幾條指令就可以搭建好了:
#Zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#kafka 記得改成自己的IP
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.2.149 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
#進kafka配置
docker exec -ti kafka /bin/bash
#路徑 然後按配置文檔操作就行了
cd opt/kafka_2.12-2.3.0/
#對了,記得開端口,Zookeeper是2181,kafka是9092
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
到現在為止,綁定已經完成,然後看一下消息的實際釋出在哪裡:
小筆記:在消息裡要放多少資料呢?比如是隻放一個id,還是把一整個表的改動都放進去呢?隻放一個ID的意思是,我通知你有變化了,你去源頭拉取資料驗證吧。全部放的意思是,直接從消息隊列裡就可以拿到資料。這樣一看,後者少了一次請求,似乎更好?實際上不是這樣的,後者不能保證資料是最新的,實際上在消息隊列裡,消息可能是很久之前的,而且資料可以不按順序進行檢索。是以,還是溯源一下比較好。
2.在許可證服務中編寫消息消費者
pom依賴不多說。
引導類有點不同。
因為消費者隻需要監聽就行了,是以:
消費者組的概念:可能有多個服務,一個服務可能有多個執行個體,把它們編個組,那麼就隻讓其中一個執行個體接收消息就可以了,意思就是小組裡的人都受到了一條重新整理的消息,讓一個人去通知redis重新整理就行了。
3.在實際操作中檢視消息服務。
更改Zookeeper和kafka的host配置為實際位址,依次開啟Eureka->配置服務->許可證服務->組織服務->路由服務。
然後put以下内容:
{
"contactEmail":"[email protected]",
"contactName":"week",
"contactPhone":"123456789",
"id":"e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
"name":"customer-crm-co"
}
put之後,在組織微服務就看到了一個UPDATE:
許可證服務也收到了一條消息:
二、Spring Cloud Stream用例:分布式緩存
剛才的驗證階段,許可證服務實際上是報錯了的,是因為還沒有配置Redis,這個問題放在這節講述。
這裡先用docker裝一下redis:
#安裝
docker run -p 6379:6379 -d redis:latest redis-server
#開端口
firewall-cmd --add-port=6379/tcp --permanent
firewall-cmd --reload
1.使用redis來查找緩存。
(1)pom依賴
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
(2)構造一個到Redis伺服器的資料庫連接配接
設定在這裡:
(3)定義Spring Data Redis存儲庫
接口實作:
(4)使用Redis和許可證服務來存儲和讀取組織資料
驗證一下:
運作第二次就會發現:
用redisclient也能看到:
2.定義自定義通道
我們之前用的是input和output的預設通道,現在來自定義一個:
而output是這樣的:
@OutputChannel(outboudOrg)
MessageChannel outboudOrg();
然後許可證服務的配置要改成這樣:
再處理一下監聽處理器:
以上~