作用:屏蔽底層消息中間件的差異,降低切換成本,統一消息的程式設計模型;
Binder: input–消費之 output—生産者
Stream中的消息通信方式遵循了釋出-訂閱模式
SpringCloud Stream标準流程套路
編碼API和常用注解
案例
準備:Eureka服務注冊中心;消息生産者;兩個消息消費者;
1.建立消息驅動之生産者
①在pom檔案中添加消息驅動依賴
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
②application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于與binding整合
type: rabbit # 消息元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定消息類型,本次為json,文本則設定"text/plain"
binder: defaultRabbit # 設定要綁定的消息服務的具體設定
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 #如果現在超過了5秒的間隔(預設90秒)
instance-id: send-8801.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為ip位址
③寫主啟動類
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
④service及實作類
public interface IMessageProvider {
public String send();
}
@EnableBinding(Source.class) //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; //消息發送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("***********serial:"+serial);
return null;
}
}
⑤controller
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return iMessageProvider.send();
}
}
測試:啟動Eureka注冊中心,RabbitMQ,以及剛剛寫的生産者
通路:http://localhost:8801/sendMessage,在localhost:15672頁面可以看到資料波峰;
消費者如何獲得消息呢?下面建構消費者
2.建立消費驅動之消費者(消息接收子產品)
①pom和上面的一樣
②application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務資訊
defaultRabbit: # 表示定義的名稱,用于與binding整合
type: rabbit #設定元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服務的整合處理
input: #這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json #設定消息類型,本次為對象json,如果是文本則設定"text/plain"
binder: defaultRabbit #設定要綁定的消息服務的具體設定
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設定心跳時間間隔(預設30秒)
lease-expiration-duration-in-seconds: 5 #如果現在超過了5秒的時間間隔(預設90秒)
instance-id: send-8802.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為ip位址
③主啟動類
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
④業務
由于它是個消費者,是以直接寫在controller中
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消費者1号,--------->接收到的消息::"+message.getPayload()+"\t port:"+serverPort);
}
}
測試:通路http://localhost:8801/sendMessage,表示服務生産者發送消息,觀察消費者控制台,可以列印消息,證明消費者可以收到消息了
消息重複消費的問題
**原理:**微服務應用放置于同一個group中,就能保證消息隻會被其中一個應用消費一次。不同的組是可以重複消費的,同一個組内會發生競争關系,隻有其中一個可以消費.
1.将兩個服務都變成不同的組
像上面的這種配置,将兩個微服務配置設定到了兩個不同的組(盡管他們在同一個應用中),這樣會出現消息被重複消費的現象.
為了解決這個問題,SpringCloud Stream提供了
消費組
的概念.
2.實作不重複消費
實作輪詢分組,每次隻有一個消費者.消息發送者發的消息隻能被其中一個消費者接收到,這樣就避免了消息的重複消費;
是以,将兩個消費者都變成相同組,group兩個相同—将兩個微服務的application.yml中group的值設為相同
持久化
自己了解:分組的服務可以接受之前錯過的消息;未分組的接收不到錯過的消息