天天看點

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

源碼:Chapter 8 Event-driven architecture with Spring Cloud Stream

要實作的功能是:現有兩個微服務,組織服務A,和許可證服務B,B用redis儲存A的緩存,B産生變化的時候,用消息隊列通知A資料已經變化,請重新整理緩存。

一、編寫簡單的消息生産者和消費者

1.在組織服務裡編寫消息生産者

組織服務pom依賴:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
           
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

這個就是告訴Spring Cloud Stream将服務綁定到消息代理,該服務将通過Source類上定義的一組通道與消息代理進行通信。Spring Cloud Steam上有一個預設的通道集,可以配置它們來與消息代理進行通信。但是現在還沒說它要綁到哪個消息代理。

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

到這裡還沒提到要用哪個消息中間件,接下來就把它綁到kafka吧~

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

在這裡簡單貼一下Zookeeper和kafka的基本知識(學習的時候,可以先黑箱再白箱):簡單了解一下kafka的叢集結構。然後試着把它搭建出來,Docker比較友善,首先保證有Java環境。

然後,幾條指令就可以搭建好了:

#Zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#kafka 記得改成自己的IP
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.2.149 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
#進kafka配置
docker exec -ti kafka /bin/bash
#路徑 然後按配置文檔操作就行了
cd opt/kafka_2.12-2.3.0/
#對了,記得開端口,Zookeeper是2181,kafka是9092
firewall-cmd --add-port=2181/tcp --permanent 
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload 
           

到現在為止,綁定已經完成,然後看一下消息的實際釋出在哪裡:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

小筆記:在消息裡要放多少資料呢?比如是隻放一個id,還是把一整個表的改動都放進去呢?隻放一個ID的意思是,我通知你有變化了,你去源頭拉取資料驗證吧。全部放的意思是,直接從消息隊列裡就可以拿到資料。這樣一看,後者少了一次請求,似乎更好?實際上不是這樣的,後者不能保證資料是最新的,實際上在消息隊列裡,消息可能是很久之前的,而且資料可以不按順序進行檢索。是以,還是溯源一下比較好。

2.在許可證服務中編寫消息消費者

pom依賴不多說。

引導類有點不同。

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

因為消費者隻需要監聽就行了,是以:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

消費者組的概念:可能有多個服務,一個服務可能有多個執行個體,把它們編個組,那麼就隻讓其中一個執行個體接收消息就可以了,意思就是小組裡的人都受到了一條重新整理的消息,讓一個人去通知redis重新整理就行了。

3.在實際操作中檢視消息服務。

更改Zookeeper和kafka的host配置為實際位址,依次開啟Eureka->配置服務->許可證服務->組織服務->路由服務。

然後put以下内容:

{
	"contactEmail":"[email protected]",
	"contactName":"week",
	"contactPhone":"123456789",
	"id":"e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
	"name":"customer-crm-co"
}
           
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

put之後,在組織微服務就看到了一個UPDATE:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

許可證服務也收到了一條消息:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

二、Spring Cloud Stream用例:分布式緩存

剛才的驗證階段,許可證服務實際上是報錯了的,是因為還沒有配置Redis,這個問題放在這節講述。

這裡先用docker裝一下redis:

#安裝
docker run -p 6379:6379 -d redis:latest redis-server
#開端口
firewall-cmd --add-port=6379/tcp --permanent
firewall-cmd --reload
           

1.使用redis來查找緩存。

(1)pom依賴

<dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-redis</artifactId>
      <version>1.7.4.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.0</version>
    </dependency>
           
(2)構造一個到Redis伺服器的資料庫連接配接
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

設定在這裡:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)
(3)定義Spring Data Redis存儲庫
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

接口實作:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)
(4)使用Redis和許可證服務來存儲和讀取組織資料
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)
使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

驗證一下:

運作第二次就會發現:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

用redisclient也能看到:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

2.定義自定義通道

我們之前用的是input和output的預設通道,現在來自定義一個:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

而output是這樣的:

@OutputChannel(outboudOrg)
MessageChannel outboudOrg();
           

然後許可證服務的配置要改成這樣:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

再處理一下監聽處理器:

使用Spring Cloud Stream的事件驅動架構(利用kafka和redis,學習筆記)

以上~

繼續閱讀