實作生産者發送消息,消費者接收消息的demo
在你的項目中加入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
注意:如果你是使用的 spring initialzar 加入的spring cloud stream子產品,如果項目中沒有這個依賴,也需要加入上面這個依賴。
根據規劃,我們需要兩個服務,消息消費者和消息生産者,是以需要建構兩個子產品。
對于消息消費者子產品,在配置檔案中寫入如下配置:
# 消息隊列相關
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.password=guest
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
# cloud stream 相關
spring.cloud.stream.bindings.input.destination=teststream
spring.cloud.stream.bindings.input.group=stream_receiver
然後實作消息的接收,代碼如下:
@EnableBinding(value = {Sink.class, Source.class})
public class StreamReceiver{
@Value("${spring.profiles.active:0}")
private String active;
Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(Sink.INPUT)
public void messageReceive(@Payload ProjectDto dto, @Headers Map headers) {
logger.info("執行個體{}", active);
logger.info("項目名:{},項目id:{}",dto.getInfo(), dto.getPrjId());
}
}
這裡 綁定的是具有消息通道的接口,cloud Stream預設提供 Sink/Source/Process 三個不同的接口,其中 Sink提供input接口,Source提供output接口。 表示監聽這個通道。至于
public class ProjectDto implements Serializable {
private int prjId ;
private String info;
public String getInfo() {
return info;
}
public int getPrjId() {
return prjId;
}
public void setInfo(String info) {
this.info = info;
}
public void setPrjId(int prjId) {
this.prjId = prjId;
}
}
然後來寫生産者,pom.xml跟消費者是相同的,配置檔案中加入:
# 消息隊列相關
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
# springcloud stream
## rabbitmq exchange
spring.cloud.stream.bindings.output.destination=teststream
## rabbitmq queue 在測試消費組的時候去掉這裡的group
spring.cloud.stream.bindings.output.group=stream_receiver
我這裡為了友善測試,将消息生産者寫成了 web 模式,提供rest接口,是以需要引入依賴包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.2.2</version>
</dependency>
然後修改自動生成的主程式:
@EnableAutoConfiguration
@Configuration
@ServletComponentScan
@ComponentScan({"com.xinzu.stream1"})
public class Stream1Application {
public static void main(String[] args) {
SpringApplication.run(Stream1Application.class, args);
}
}
并定義消息發送的服務:
@Service
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMessage(String ans, int pojid){
try {
ProjectDto dto = new ProjectDto();
dto.setInfo(ans);
dto.setPrjId(pojid);
// Consts.setProjectId();
source.output().send(MessageBuilder.withPayload(dto).build());
}catch (Exception e){
e.printStackTrace();
}
}
}
如果你是采用的idea作為ide,可以source會說沒有bean,而被标紅,但是實際上編譯是沒有任何問題的,這裡你可以降低idea标紅的級别,進而增強程式設計體驗。
寫一個控制器來調用該服務:
@Api(value = "測試", tags = {"測試"})
@RestController
public class TestStreamSender{
@Autowired
private SendService service;
@RequestMapping(value = "/teststream", method = RequestMethod.POST)
@ResponseBody
public Integer testStream(@RequestParam("val") final String val,
@RequestParam("pojid") final Integer projid){
try {
for (int j=0;j<4;j++){
for (int i=projid;i<projid+50;i++){
String ans = val;
int number = (int) (( Math.random()*1000)%1000);
ans = ans + number;
service.sendMessage(ans,i);
Thread.sleep(500);
}
}
}catch (Exception e){
return 0;
}
return 1;
}
}
最後再給出swagger的配置:
@Configuration
@EnableSwagger2
public class Swagger2 {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.xinzu.stream1"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("springcloudsender")
.description("springcloudsender api")
.termsOfServiceUrl("http://www.chaojilaji.com/")
.contact("chaojilaji")
.version("1.0")
.build();
}
}
然後分别打開生産者和消費者的一個執行個體,檢驗執行結果
開啟生産者服務效果:
開啟消費者服務效果:
生成的queue:
在swagger ui上面進行測試:
結果:
一個簡單的消息發送和接收就寫好了
對消息序列化和反序列化
其實上面的代碼中已經使用到了消息的序列化和反序列化,隻不過沒有進行仔細說明,首先定義一個pojo并實作序列化
public class ProjectDto implements Serializable {
private int prjId ;
private String info;
public String getInfo() {
return info;
}
public int getPrjId() {
return prjId;
}
public void setInfo(String info) {
this.info = info;
}
public void setPrjId(int prjId) {
this.prjId = prjId;
}
}
然後,在消費者接收消息時,用
@StreamListener(Sink.INPUT)
public void messageReceive(@Payload ProjectDto dto, @Headers Map headers) {
logger.info("執行個體{}", active);
logger.info("項目名:{},項目id:{}",dto.getInfo(), dto.getPrjId());
}
實作消息的序列化和反序列化是消息分區的基礎
驗證消費組
啟動多個消費者執行個體,并分成兩個組。如下
在主配置檔案中寫入:
spring.profiles.active=1
spring.profiles.active=2
spring.profiles.active=3
spring.profiles.active=4
spring.profiles.active=5
spring.profiles.active=6
然後建立6個不同的配置檔案,注意命名不能發生變化:
在前三個個配置檔案中均寫入以下内容:
# 消息隊列相關
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.password=guest
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
# cloud stream 相關
spring.cloud.stream.bindings.input.destination=stream
spring.cloud.stream.bindings.input.group=stream_receiver_1
## 消費組
spring.profiles.active=
在後三個配置檔案中寫入
# 消息隊列相關
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.password=guest
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
# cloud stream 相關
spring.cloud.stream.bindings.input.destination=stream
spring.cloud.stream.bindings.input.group=stream_receiver_2
##
spring.profiles.active=
然後進入到主配置檔案,即
通過一個個的去掉注釋,打開6個執行個體
在idea中如果需要打開多個執行個體,需要在edit服務裡面去掉勾選以單例模式運作的選項即可
下面是6個消費者開啟日志
來看看rabbitmq中的效果:
兩個不同的 組生成了兩個不同的隊列,是以我們可以猜到,cloud stream實作消費者組就是通過生成不同的消息隊列來實作的。然後由此我們知道,應該修改生産者的組,不把生産者定義到任何組,隻要把消息發送到 stream這個交換機就可以了,是以實作消費者組的功能,修改生産者,隻需要将它的組設定成不和任何一個消費者組一樣就行,這裡我直接去掉。
## rabbitmq queue 在測試消費組的時候去掉這裡的group
#spring.cloud.stream.bindings.output.group=stream_receiver
打開生産者,驗證消費者組内每個消息都隻有一個執行個體被消費,且全部執行個體消費的消息與消息總集合相等
stream_receiver_1:
stream_receiver_2:
上述驗證都十分正确,但是有個奇怪的現象,兩個不同的分組,項目分布完全一緻,這裡估計還需進行仔細研究。
驗證分區
分區最重要的功能就是處于一個組裡面的項目根據關鍵字進行連續配置設定。這種場景是在統計需求或者耗時項目需求中需要實作的場景。
修改消費者配置檔案
# 消息隊列相關
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.password=guest
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
# cloud stream 相關
spring.cloud.stream.bindings.input.destination=streamfenqu
spring.cloud.stream.bindings.input.group=stream_receiver
##
spring.profiles.active=
##
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instance-count=3
# 這個是分區執行個體的編号,執行個體編号不能重複
spring.cloud.stream.instance-index=
我這裡配置了三個分區執行個體,是以index應該是 0/1/2
然後修改生産者的配置檔案
spring.cloud.stream.bindings.output.destination=streamfenqu
## rabbitmq queue 在測試消費組的時候去掉這裡的group
#spring.cloud.stream.bindings.output.group=stream_receiver
## 分區
spring.cloud.stream.bindings.output.producer.partition-key-expression=payload.prjId
spring.cloud.stream.bindings.output.producer.partition-count=3
在生産者這裡定義 partition-key-expression 為 payload.prjId。如果你還記得序列化那裡,我們用來修飾 ProjectDto的注釋是 ,那麼恭喜你,這裡的 payload.prjId 就是指代的 ProjectDto.prjId。不過這裡唯一的不便捷性就是針對該生産者發出的消息,所有的分區都會采用該 特征作為分區的基礎,需要研究其擴充性。然後分别打開三個消費者執行個體(和消費者那裡一樣,一個一個的注釋打開)和一個生産者。并進行驗證: