1.在項目的pom中引入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.配置消息通道
public interface Demo {
/**
* 發消息的通道名稱
*/
String DEMO_OUTPUT = "demo_output";
/**
* 消息的訂閱通道名稱
*/
String DEMO_INPUT = "demo_input";
/**
* 發消息的通道
*
* @return
*/
@Output(DEMO_OUTPUT)
MessageChannel sendDemoMessage();
/**
* 收消息的通道
*
* @return
*/
@Input(DEMO_INPUT)
SubscribableChannel recieveDemoMessage();
}
- 使帶注釋元件的結合Input和Output根據作為值給注釋傳遞接口的清單到代理
@EnableBinding(value = {Demo.class})
4.連結kafka配置
spring.cloud.stream.bindings.demo_input.destination=demo
spring.cloud.stream.bindings.demo_input.group=demo
spring.cloud.stream.bindings.demo_output.destination=demo
spring.cloud.stream.bindings.demo_output.group=demo
spring.cloud.stream.default-binder=kafka
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
5.發送消息
@Resource(name = Demo.DEMO_OUTPUT)
private MessageChannel sendDemoMessageChannel;
@Test
public void Demo() {
boolean isSendSuccess = sendDemoMessageChannel.
send(MessageBuilder.withPayload("OK").build());
System.out.println(isSendSuccess);
}
6.接收消息
@StreamListener(Demo. DEMO_INPUT)
public void insertQuotationK(Message<String> message) {
if (StringUtils.isEmpty(message.getPayload())) {
System.out.println("receiver data is empty !");
System.out.println(400 + "failed");
}
System.out.println("kafka收到"+message.getPayload());
}
7.結束咯,如果出現異常,請留言。