天天看點

SpringCloud系列之SpringCloud Stream

SpringCloud Stream

SpringCloud Config

SpringCloud Gatewa

SpringCloud Hystrix

SpringCloud 第一部分

文章目錄

    • SpringCloud Stream
      • MQ相關術語
      • 相關注解
      • 重複消費問題
      • 消息持久化

技術興起的原因:為了解決系統中不同中間件的适配問題,出現了cloud stream,采用适配綁定的方式,自動給不同的MQ之間進行切換。

屏蔽底層消息中間件的差異,降低切換成本,統一消息的程式設計模型。

SpringCloud系列之SpringCloud Stream

官方定義Spring Cloud Stream是一個建構消息驅動微服務的架構。

應用程式通過inputs(消費者)或者outputs(生産者)來與Spring Cloud Stream中binder對象互動。通過我們配置來綁定,而Spring Cloud Stream的binder對象負責與消息中間件互動。

Spring Cloud Stream為一些供應商的消息中間件産品提供了個性化的自動配置,引用了釋出、訂閱、消費、分區的三個核心概念。

官方版本目前僅僅支援RabbitMQ和Kafka。

MQ相關術語

Message:生産者/消費者之間靠消息媒介傳遞資訊内容

MessageChannel:消息必須走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息處理器所訂閱。

相關注解

  • Middleware:中間件,目前隻支援RabbitMQ和Kafka
  • Binder:應用層和消息中間件之間的封裝,實作了Kafka和RabbitMQ的Binder,通過Binder可以很友善的連接配接中間件,可以動态的改變消息類型,這些可以通過配置檔案修改。
  • Input:表示輸入通道,消息進入該通道傳到應用程式。
  • Output:注解辨別輸出通道,釋出的消息将通過該通道離開應用程式。
  • StreamListener:監聽隊列,用于消費者的隊列的消息接收。
  • EnableBinding:将信道channel和exchange綁定在一起。

首先建立一個provider,服務提供者rabbitmq-provider8801

導入依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
           

編寫配置檔案application.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: 192.168.31.52 #rabbitmq服務啟動所在機器的IP位址
                port: 5672
                username: guest 
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定

eureka:
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8801.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址


           

編寫一個發送資料的接口

IMessageProvider

public interface IMessageProvider {
    String sendMessage();
}
           

接口的實作類

IMessageProviderImpl

@EnableBinding(Source.class) //定義消息的推送管道
public class IMessageProviderImpl implements IMessageProvider
{
    @Resource
    private MessageChannel output; // 消息發送管道

    @Override
    public String sendMessage()
    {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null;
    }
}
           

controller層下的SendMessageController

@RestController
public class SendMessageController {

    @Autowired
    private IMessageProvider iMessageProvider;

    @GetMapping(value = "/sendMessage")
    public String send(){
       return  iMessageProvider.sendMessage();
    }
}

           

啟動Eureka7001,啟動服務提供者8801.啟動虛拟機上的RabbitMQ

SpringCloud系列之SpringCloud Stream

記得把虛拟機防火牆關了。

[[email protected] bin]$ systemctl stop firewalld
[[email protected] bin]$ systemctl status firewalld
           

然後測試一下服務提供者是否正常運作。

發送請求:http://localhost:8801/sendMessage

SpringCloud系列之SpringCloud Stream

控制台輸出UUID。

然後再建立一個服務消費者,在MQ的另一端進行消費消息。

建立另一個子產品,cloud-stream-rabbitmq-consumer8802

導入依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
           

和上一個服務提供者的依賴一樣。

寫配置檔案application.yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: 192.168.31.52
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定



eureka:
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8802.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址

           

建立一個消費者的

ReceiveMessageController

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void  input(Message<String> message){
        System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);
    }
}

           

如果消費者成功接收消息,則在控制台輸出産生的UUID和端口号。

啟動Eureka7001,啟動服務提供者8801,啟動服務消費者8802,還有MQ。

在Eureka中可以看到兩個服務已經啟動。

SpringCloud系列之SpringCloud Stream

每次請求http://localhost:8801/sendMessage;消費者都能輸出結果,輸出的UUID與提供者的一緻。

SpringCloud系列之SpringCloud Stream
SpringCloud系列之SpringCloud Stream

登入RabbitMQ的web管理,可以看到我們建立的exchange,并且可以檢視消息隊列中的請求次數的情況。

SpringCloud系列之SpringCloud Stream
SpringCloud系列之SpringCloud Stream

發送的消息除了可以是字元串類型還可以發送對象,在消費者接受資料的時候,會将實體轉換成JSON字元串。

配置檔案中,如果你使用的消息中間件是kafka,type: kafka;environment是設定消息中間件的配置資訊,端口,主機位址,使用者名,密碼等,可以設定多個binder,适應不同的場景。

重複消費問題

預設情況下,每個消費者的分組名都是随機的,不同的,對于不同的組會引起重複消費的問題,例如:消息提供者隻向消息隊列中發送了一個消息,正常情況下,消費者A從隊列中拿走之後,消費者B不能再獲得相同的消息,但是由于AB是不同的組,是以A和B都會擷取相同的消息,這就導緻了資源被重複消費。

微服務應用放置到同一個group中,就能夠保證消息隻會被其中應用消費一次,不同的組是可以消費的,同一個組内會發生競争關系,隻有其中一個可以消費。

同一個應用的不同微服務,隻用在配置檔案中指定相同的group。

SpringCloud系列之SpringCloud Stream
SpringCloud系列之SpringCloud Stream

再次發送消息時,隻有消費者其中一個能消費。避免了重複消費。

消息持久化

當兩個消費者A和B,A設定了group屬性值,B沒有設定,這時,消費者全部當機,但是消息生産者一直響MQ中生産消息,這時候重新開機A和B兩者有什麼差別呢?

正因為B沒有這時分組,B再次啟動後不會再向MQ中取資料,而A啟動成功後可以正常消費消息隊列中的消息。

是以設定了group的消費者,可以保證消息隊列中的消息持久化,group對于消費者來講很重要,既能避免重複消費,又能在消費者重新開機後依然可以消費消息隊列中未消費的消息。