天天看點

【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth

Spring Cloud Bus

  • Spring Cloud Bus
    • 結合RabbitMQ
      • RabbitMQ前置知識
      • 使用
      • 原理
      • 指定重新整理範圍
      • 架構優化
    • kafka實作消息總線
      • kafka知識
  • Spring Cloud Stream
    • 使用(rabbitmq)
  • Spring Cloud Sleuth
    • 初始化
    • 原理
    • 抽樣收集
    • 抽樣收集整合

Spring Cloud Bus

通常使用輕量級的消息代理來建構一個公用的消息主題,讓系統中所有的微服務執行個體都連接配接上來,由于該主題中産生的消息會被所有的執行個體監聽和消費,是以稱它為消息總線;

結合RabbitMQ

RabbitMQ前置知識

【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth
【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth

使用

  1. 建立spring boot工程 ,命名為rabbitmq-hello
  2. pom引入依賴 spring-boot-starter-amqp
  3. 配置rabbitmq的連接配接資訊
  4. 使用參考demo

原理

【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth

指定重新整理範圍

  1. 通過向服務執行個體請求Spring Cloud Bus的/bus/refresh接口,進而實作總線上其他服務執行個體的/refresh
  2. 如果要重新整理某個具體執行個體的配置:/bus/refresh接口提供了一個destination參數來指定要重新整理的應用程式(/bus/refresh?destination=customers:9000)

架構優化

【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth
  1. Config Server引入Spring Cloud Bus 将配置服務端加入到消息總線中來;
  2. /bus/refresh請求不再發送到具體服務執行個體上,而是發送給Config Server 并通過destination參數來指定要更新配置的服務或執行個體;

kafka實作消息總線

kafka知識

【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth

Spring Cloud Stream

使用(rabbitmq)

  1. 建立一個基礎的Spring Boot 工程,命名為stream-hello
  2. 添加依賴spring-cloud-starter-stream-rabbit
  3. 建立用于接收來自RabbitMQ消息的消費者SinkReceiver
@EnableBinding(value = {SinkSender.class})
public class SinkReceiver {
   private static Logger logger = LoggerFactory.getLogger(DemoApplication.class);

    /**
     * 消息通道的消費者
     * @param payload
     */
   @StreamListener(Sink.INPUT)
   public void receive(Object payload){
       logger.info("received: " + payload);
   }
}
           
  1. @EnableBinding 該注解用來指定一個或多個定義了@Input或Output注解的接口,以此實作對消息通道(Channel)的綁定;
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}
           
  1. @StreamListener 主要定義在方法上,作用是将被修飾的方法注冊為消息中間件上資料流的事件監聽器,注解中的屬性值對于了監聽的消息通道名;
  2. 配置通道的主題
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
           
  1. 如果在同一個主題上的應用需要啟動多個執行個體的時候,我們可以通過spring.cloud.stream.bindings.input.group屬性為應用指定一個組名,這樣這個應用的多個執行個體在接收到消息的時候,隻會有一個成員真正收到消息并進行處理;
  2. 沒有為應用指定消費組的時候,Spring Cloud Stream 會為其配置設定一個獨立的匿名消費組。如果同一主題下的所有應用都沒有指定消費組的時候,當有消息釋出之後,所有的應用都會對其進行消費,因為它們屬于一個獨立的組。是以最好為其指定一個消費組。
  3. 消息分區:當生産者将消息資料發送給多個消費者執行個體時,保證擁有共同特征的消息資料始終是由同一個消費者執行個體接收和處理;
  4. @Input和@Output定義消息通道, 注入綁定接口,如上述代碼所示;
  5. @StreamListener與@ServiceActivator(Spring Intergation的注解)相比,第一個注解功能更加強大,後者沒有轉換功能,不能将JSON或XML專成對象等;
  6. 消息回報 :很多時候在處理完消息之後,需要回報一個消息給對方,這時可通過@SendTo注解來指定傳回内容的輸出通道;
@EnableScheduling
@EnableBinding(value = {Processor.class})
public class App1 {

   private static Logger logger =  LoggerFactory.getLogger(App1.class);

   //@StreamListener(Processor.INPUT)
   //@SendTo(Processor.OUTPUT)
   @ServiceActivator(inputChannel = Processor.INPUT,outputChannel = Processor.OUTPUT)
   public Object receiveFromInput(Object payLoad){
       logger.info("APP1接收到消息-"+ payLoad);
       return "傳回-"+payLoad;
   }

}

@EnableBinding(value = {Processor.class})
public class App2 {

    private static final Logger logger = Logger.getLogger("Process");

    @Bean
    @InboundChannelAdapter(value = Processor.OUTPUT,poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Date> timerMessageSource(){
        return ()->new GenericMessage<>(new Date());
    }
    @StreamListener(Processor.INPUT)
    public void receiveFromOut(Object payload){
        logger.info("APP2接收:"+payload);
    }
}
           
  1. 響應式程式設計可使用RxJava,即引入spring-cloud-straeam-rxjava
  2. 消費組與消費分區
#消費組
spring.cloud.stream.bindings.input.group=
#消費主題
spring.cloud.stream.bindings.input.destination=output
#開啟分區
spring.cloud.stream.bindings.input.consumer.partitioned=true
#指定消費者的總執行個體數
spring.cloud.stream.instance-count=1
#目前執行個體的索引号
spring.cloud.stream.instance-index=1
#生産者也可以進行一些配置等
#生産者主題
spring.cloud.stream.bindings.output.destination=input
#生産者 分區鍵
spring.cloud.stream.bindings.output.producer.partition-key-expression=
#指定消息分區的數量
spring.cloud.stream.bindings.output.producer.partition-count=
           
  1. 綁定器SPI :涵蓋了一套可插拔的用于連接配接外部中間件的實作機制 ,最為關鍵的是Binder接口,一個典型的Binder綁定器實作
    1. 一個實作Binder接口的類’;
    2. 一個Spring配置加載類,用于建立連接配接消息中間件的基礎結構使用的執行個體;
    3. 一個或多個能夠在classpath下的META-INF/sping.binders路徑下找到的綁定器定義檔案,該檔案一般存儲了目前綁定器使用的自動化配置類的路徑;
  2. 綁定器自動化配置
    1. spring-cloud-stream-binder-rabbit
    2. spring-cloud-stream-binder-kafka
  3. 多綁定器配置
    【筆記】SpringCloud(5)之Spring Cloud Bus(消息總線)、Spring Cloud Stream(消息驅動)、Spring Cloud Sleuth(分布式服務跟蹤)Spring Cloud BusSpring Cloud StreamSpring Cloud Sleuth

Spring Cloud Sleuth

初始化

  1. 引入Spring-cloud-starter-sleuth元件
  2. 預設參數(A1,A2,A3,A4)

    - A1: 應用的名稱(Spring.application.name);

    - A2:TreceID(用來辨別一條請求鍊路,一條請求鍊路包含一個TraceID,多個SpanID);

    - A3:SpanID(表示一個基本的工作單元,比如發送一個http請求);

    - false , 表四是否将該資訊輸出到Zipkin等服務中來收集和展示;

原理

  1. 分布式系統的入口,服務跟蹤架構為請求建立一個唯一的跟蹤辨別(TreceID);
  2. 當請求到達各個元件時,或是處理邏輯到達某個狀态時,也通過唯一辨別來标記它的開始,具體過程和結果(SpanID);

抽樣收集

預設10% 收集 :spring.sleuth.sampler.percentage=0.1

抽樣收集整合

  • logstash
  • zipkin
  • http
  • 消息中間件