SpringCloud Stream
SpringCloud Config
SpringCloud Gatewa
SpringCloud Hystrix
SpringCloud 第一部分
文章目錄
-
- SpringCloud Stream
-
- MQ相關術語
- 相關注解
- 重複消費問題
- 消息持久化
技術興起的原因:為了解決系統中不同中間件的适配問題,出現了cloud stream,采用适配綁定的方式,自動給不同的MQ之間進行切換。
屏蔽底層消息中間件的差異,降低切換成本,統一消息的程式設計模型。

官方定義Spring Cloud Stream是一個建構消息驅動微服務的架構。
應用程式通過inputs(消費者)或者outputs(生産者)來與Spring Cloud Stream中binder對象互動。通過我們配置來綁定,而Spring Cloud Stream的binder對象負責與消息中間件互動。
Spring Cloud Stream為一些供應商的消息中間件産品提供了個性化的自動配置,引用了釋出、訂閱、消費、分區的三個核心概念。
官方版本目前僅僅支援RabbitMQ和Kafka。
MQ相關術語
Message:生産者/消費者之間靠消息媒介傳遞資訊内容
MessageChannel:消息必須走特定的通道
消息通道的子接口SubscribableChannel,由MessageHandle消息處理器所訂閱。
相關注解
- Middleware:中間件,目前隻支援RabbitMQ和Kafka
- Binder:應用層和消息中間件之間的封裝,實作了Kafka和RabbitMQ的Binder,通過Binder可以很友善的連接配接中間件,可以動态的改變消息類型,這些可以通過配置檔案修改。
- Input:表示輸入通道,消息進入該通道傳到應用程式。
- Output:注解辨別輸出通道,釋出的消息将通過該通道離開應用程式。
- StreamListener:監聽隊列,用于消費者的隊列的消息接收。
- EnableBinding:将信道channel和exchange綁定在一起。
首先建立一個provider,服務提供者rabbitmq-provider8801
導入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
編寫配置檔案application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.31.52 #rabbitmq服務啟動所在機器的IP位址
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
binder: defaultRabbit # 設定要綁定的消息服務的具體設定
eureka:
client: # 用戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: send-8801.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為IP位址
編寫一個發送資料的接口
IMessageProvider
public interface IMessageProvider {
String sendMessage();
}
接口的實作類
IMessageProviderImpl
@EnableBinding(Source.class) //定義消息的推送管道
public class IMessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息發送管道
@Override
public String sendMessage()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
controller層下的SendMessageController
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider iMessageProvider;
@GetMapping(value = "/sendMessage")
public String send(){
return iMessageProvider.sendMessage();
}
}
啟動Eureka7001,啟動服務提供者8801.啟動虛拟機上的RabbitMQ
記得把虛拟機防火牆關了。
[[email protected] bin]$ systemctl stop firewalld
[[email protected] bin]$ systemctl status firewalld
然後測試一下服務提供者是否正常運作。
發送請求:http://localhost:8801/sendMessage
控制台輸出UUID。
然後再建立一個服務消費者,在MQ的另一端進行消費消息。
建立另一個子產品,cloud-stream-rabbitmq-consumer8802
導入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
和上一個服務提供者的依賴一樣。
寫配置檔案application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.31.52
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
binder: defaultRabbit # 設定要綁定的消息服務的具體設定
eureka:
client: # 用戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8802.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為IP位址
建立一個消費者的
ReceiveMessageController
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);
}
}
如果消費者成功接收消息,則在控制台輸出産生的UUID和端口号。
啟動Eureka7001,啟動服務提供者8801,啟動服務消費者8802,還有MQ。
在Eureka中可以看到兩個服務已經啟動。
每次請求http://localhost:8801/sendMessage;消費者都能輸出結果,輸出的UUID與提供者的一緻。
登入RabbitMQ的web管理,可以看到我們建立的exchange,并且可以檢視消息隊列中的請求次數的情況。
發送的消息除了可以是字元串類型還可以發送對象,在消費者接受資料的時候,會将實體轉換成JSON字元串。
配置檔案中,如果你使用的消息中間件是kafka,type: kafka;environment是設定消息中間件的配置資訊,端口,主機位址,使用者名,密碼等,可以設定多個binder,适應不同的場景。
重複消費問題
預設情況下,每個消費者的分組名都是随機的,不同的,對于不同的組會引起重複消費的問題,例如:消息提供者隻向消息隊列中發送了一個消息,正常情況下,消費者A從隊列中拿走之後,消費者B不能再獲得相同的消息,但是由于AB是不同的組,是以A和B都會擷取相同的消息,這就導緻了資源被重複消費。
微服務應用放置到同一個group中,就能夠保證消息隻會被其中應用消費一次,不同的組是可以消費的,同一個組内會發生競争關系,隻有其中一個可以消費。
同一個應用的不同微服務,隻用在配置檔案中指定相同的group。
再次發送消息時,隻有消費者其中一個能消費。避免了重複消費。
消息持久化
當兩個消費者A和B,A設定了group屬性值,B沒有設定,這時,消費者全部當機,但是消息生産者一直響MQ中生産消息,這時候重新開機A和B兩者有什麼差別呢?
正因為B沒有這時分組,B再次啟動後不會再向MQ中取資料,而A啟動成功後可以正常消費消息隊列中的消息。
是以設定了group的消費者,可以保證消息隊列中的消息持久化,group對于消費者來講很重要,既能避免重複消費,又能在消費者重新開機後依然可以消費消息隊列中未消費的消息。