天天看點

Springboot最簡單的實戰介紹 整合kafka-生産者與消費者(消息推送與訂閱擷取)

Kafka是什麼,如果你還不了解這個中間件,那麼先看看這個(關于介紹kafka的),

好了,下面我們開始整合:

首先,先往pom.xml檔案添加Kafka的依賴,

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>      

然後,接下來是配置檔案(以下提供properties格式,yml格式 ,供大家随便自取), 

當然注釋也是要好好看看的,畢竟都一字字敲的。

application.properties

#============== kafka ===================
# 指定kafka 代理位址,可以多個
#spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
spring.kafka.bootstrap-servers=192.168.x.xxx:9092
#=============== producer生産者  =======================

spring.kafka.producer.retries=0
# 每次批量發送消息的數量
spring.kafka.producer.batch-size=16384
# 緩存容量
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer消費者  =======================
# 指定預設消費者group id
spring.kafka.consumer.group-id=test-app

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms

# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#spring.kafka.consumer.bootstrap-servers=192.168.8.111:9092
#spring.kafka.consumer.zookeeper.connect=192.168.8.103:2181
#指定tomcat端口
server.port=8063      

application.yml:

spring:
  # KAFKA
  kafka:
    # ָkafka伺服器位址,可以指定多個
    bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
    #=============== producer生産者配置 =======================
    producer:
      retries: 0
      # 每次批量發送消息的數量
      batch-size: 16384
      # 緩存容量
      buffer-memory: 33554432
      # ָ指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer消費者配置  =======================
    consumer:
      #指定預設消費者的group id
      group-id: test-app
      #earliest
      #當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費
      #latest
      #當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料
      #none
      #topic各分區都存在已送出的offset時,從offset後開始消費;隻要有一個分區不存在已送出的offset,則抛出異常
      auto-offset-reset: latest
      enable-auto-commit: true
      auto-commit-interval: 100ms
      #指定消費key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      

好了,配置工作準備完畢。

我們先來搞Kafka的生産者,也就是負責推送消息的子產品:

 建立一個類, 叫KafkaSender(注解不能少,留意代碼),

package com.kafkademo.producer;


import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


/**
 * Hello!
 * Created By  JCccc on 2018/11/24
 * 11:25
 */
@Component

public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);


    public void send(String topic, String taskid, String jsonStr) {


        //發送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            //推送成功
            public void onSuccess(SendResult<String, Object> result) {
                logger.info(topic + " 生産者 發送消息成功:" + result.toString());


            }

            @Override
            //推送失敗
            public void onFailure(Throwable ex) {
                logger.info(topic + " 生産者 發送消息失敗:" + ex.getMessage());


            }
        });


    }


}      

以上就是kafka生産者了,到此刻,你已經可以開始往kafka伺服器推送消息了

事不宜遲,我們立馬試試:

 建立個controller,搞個接口試試推送下消息,

@GetMapping("/sendMessageToKafka")
public  String sendMessageToKafka() {
    Map<String,String> messageMap=new HashMap();
    messageMap.put("message","我是一條消息");
    String taskid="123456";
    String jsonStr=JSONObject.toJSONString(messageMap);
//kakfa的推送消息方法有多種,可以采取帶有任務key的,也可以采取不帶有的(不帶時預設為null)
    kafkaSender.send("testTopic",taskid,jsonStr);

    return "hi guy!";


}      

用postman測一下(對了,這些推送的前提是你的kafka伺服器是沒問題的,能正常連接配接)

Springboot最簡單的實戰介紹 整合kafka-生産者與消費者(消息推送與訂閱擷取)

看看控制台反應: 

Springboot最簡單的實戰介紹 整合kafka-生産者與消費者(消息推送與訂閱擷取)

可以看到,我們的kafka生産者再推送消息成功後,成功進入了我們的回調函數onSuccess,也列印了日志。

沒錯,你已經掌握kafak生産者了,你已經掌握推送消息了。 那麼接下來,我們繼續搞下kafka的消費者。

我們創一個類,叫KafkaConsumer (同樣,注意看代碼,注解不能少) :

package com.kafkademo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;


import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;


import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * Hello!
 * Created By  JCccc on 2018/11/24
 * 13:13
 */
@Component
public class KafkaConsumer  {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
//下面的主題是一個數組,可以同時訂閱多主題,隻需按數組格式即可,也就是用“,”隔開
    @KafkaListener(topics = {"testTopic"})
    public void receive(ConsumerRecord<?, ?> record){

        logger.info("消費得到的消息---key: " + record.key());
        logger.info("消費得到的消息---value: " + record.value().toString());
    }

}      

好,到此,kafka的消費者就這麼簡單完成了。

 那麼,我們接下來驗證下,生産者推送消息到主題“testTopic”,消費者訂閱主題“testTopic”,把消息消費下來:

一樣,用postman來模拟下第三方調用接口,

Springboot最簡單的實戰介紹 整合kafka-生産者與消費者(消息推送與訂閱擷取)

我們看看控制台,

 沒錯,kafka生産者跟剛剛一樣,成功把消息推送到了主題testTopic上去了,回調函數OnSuccess列印了相關日志;

           而,我們的kafka消費者,也是很有效率,再檢測到自己訂閱的主題testTopic有消息,立馬消費了下來。

Springboot最簡單的實戰介紹 整合kafka-生産者與消費者(消息推送與訂閱擷取)

好了,springboot整合kafka,生産者、消費者就是這麼輕松簡單結束了。

當然了,該篇案例的生産者和消費者都放在了一個demo去介紹了,實際上大家使用的是按照業務場景,資料量去選擇是否需要分開生産者項目&消費者項目,哪些是同時有生産者和消費者的身份的,哪些是隻有生産者身份的,哪些是隻有消費者身份的。

繼續閱讀