天天看點

SpringCloud Stream消息驅動_學習筆記SpringCloud Stream标準流程套路編碼API和常用注解案例消息重複消費的問題持久化

作用:屏蔽底層消息中間件的差異,降低切換成本,統一消息的程式設計模型;

Binder: input–消費之 output—生産者

Stream中的消息通信方式遵循了釋出-訂閱模式

SpringCloud Stream标準流程套路

SpringCloud Stream消息驅動_學習筆記SpringCloud Stream标準流程套路編碼API和常用注解案例消息重複消費的問題持久化

編碼API和常用注解

SpringCloud Stream消息驅動_學習筆記SpringCloud Stream标準流程套路編碼API和常用注解案例消息重複消費的問題持久化

案例

準備:Eureka服務注冊中心;消息生産者;兩個消息消費者;

1.建立消息驅動之生産者

①在pom檔案中添加消息驅動依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
           

②application.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于與binding整合
          type: rabbit  # 消息元件類型
          environment:  # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange  # 表示要使用的Exchange名稱定義
          content-type: application/json  # 設定消息類型,本次為json,文本則設定"text/plain"
          binder: defaultRabbit     # 設定要綁定的消息服務的具體設定
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 #如果現在超過了5秒的間隔(預設90秒)
    instance-id: send-8801.com # 在資訊清單時顯示主機名稱
    prefer-ip-address: true   # 通路的路徑變為ip位址
           

③寫主啟動類

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}
           

④service及實作類

public interface IMessageProvider {
    public String send();
}
           
@EnableBinding(Source.class)  //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;  //消息發送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("***********serial:"+serial);
        return null;
    }
}
           

⑤controller

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider iMessageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return iMessageProvider.send();
    }
}
           

測試:啟動Eureka注冊中心,RabbitMQ,以及剛剛寫的生産者

通路:http://localhost:8801/sendMessage,在localhost:15672頁面可以看到資料波峰;

消費者如何獲得消息呢?下面建構消費者

2.建立消費驅動之消費者(消息接收子產品)

①pom和上面的一樣

②application.yml

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此處配置要綁定的rabbitmq的服務資訊
        defaultRabbit: # 表示定義的名稱,用于與binding整合
          type: rabbit #設定元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: #服務的整合處理
        input: #這個名字是一個通道的名稱
          destination: studyExchange  # 表示要使用的Exchange名稱定義
          content-type: application/json #設定消息類型,本次為對象json,如果是文本則設定"text/plain"
          binder: defaultRabbit #設定要綁定的消息服務的具體設定

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #設定心跳時間間隔(預設30秒)
    lease-expiration-duration-in-seconds: 5 #如果現在超過了5秒的時間間隔(預設90秒)
    instance-id: send-8802.com # 在資訊清單時顯示主機名稱
    prefer-ip-address: true   # 通路的路徑變為ip位址
           

③主啟動類

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}
           

④業務

由于它是個消費者,是以直接寫在controller中

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消費者1号,--------->接收到的消息::"+message.getPayload()+"\t port:"+serverPort);
    }
}
           

測試:通路http://localhost:8801/sendMessage,表示服務生産者發送消息,觀察消費者控制台,可以列印消息,證明消費者可以收到消息了

消息重複消費的問題

**原理:**微服務應用放置于同一個group中,就能保證消息隻會被其中一個應用消費一次。不同的組是可以重複消費的,同一個組内會發生競争關系,隻有其中一個可以消費.

1.将兩個服務都變成不同的組

SpringCloud Stream消息驅動_學習筆記SpringCloud Stream标準流程套路編碼API和常用注解案例消息重複消費的問題持久化

像上面的這種配置,将兩個微服務配置設定到了兩個不同的組(盡管他們在同一個應用中),這樣會出現消息被重複消費的現象.

為了解決這個問題,SpringCloud Stream提供了

消費組

的概念.

2.實作不重複消費

實作輪詢分組,每次隻有一個消費者.消息發送者發的消息隻能被其中一個消費者接收到,這樣就避免了消息的重複消費;

是以,将兩個消費者都變成相同組,group兩個相同—将兩個微服務的application.yml中group的值設為相同

SpringCloud Stream消息驅動_學習筆記SpringCloud Stream标準流程套路編碼API和常用注解案例消息重複消費的問題持久化

持久化

自己了解:分組的服務可以接受之前錯過的消息;未分組的接收不到錯過的消息

繼續閱讀