天天看點

kafka介紹及使用

一、MAC環境下安裝啟動kafka

1、安裝kafka

brew install kafka複制代碼
           

安裝詳情

kafka介紹及使用

安裝詳情裡面包含一些使用介紹,主要包括幾個部分:

kafka介紹及使用

安裝kafka前預設安裝了zookeeper,說明kafka依賴zookeeper,為什麼依賴,下一部分會講到。

kafka介紹及使用

這部分介紹了zookeeper和kafka的啟動指令,要麼用brew services start指令設定自啟或重新開機(macOS 使用launchtl指令加載開機自動運作的服務,brew services是launchctl的一個子集),或者直接使用工具自帶的指令啟動。

2、啟動kafka

啟動kafka

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties複制代碼
           

建立topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test複制代碼
           

檢視所有topic

kafka-topics --list --zookeeper localhost:2181複制代碼
           

生産者發送消息

kafka-console-producer --broker-list localhost:9092 --topic test
>第一條消息
>第二條消息複制代碼
           

消費者消費消息 

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
第一條消息
           

二、kafka原理介紹

kafka介紹及使用

Producer:消息生産者。

Broker:kafka叢集中的伺服器。

Topic:消息的主題,可以了解為消息的分類,kafka的資料就儲存在topic。在每個broker上都可以建立多個topic。

Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。

Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中預設副本的最大數量是10個,且副本的數量不能大于Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也隻可能存放一個副本(包括自己)。

Consumer:消息消費者。

Consumer Group:我們可以将多個消費組組成一個消費者組,在kafka的設計中同一個分區的資料隻能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的資料,這也是為了提高kafka的吞吐量!

Zookeeper:kafka叢集依賴zookeeper來儲存叢集的的元資訊,來保證系統的可用性。

1、生産消息

kafka介紹及使用

kafka的資料,實際上是以檔案的形式存儲在檔案系統的。topic下有partition,partition下有segment,segment是實際的一個個檔案,topic和partition都是抽象概念。在目錄/${topicName}-{$partitionid}/下,存儲着實際的log檔案(即segment),還有對應的索引檔案。每個segment檔案大小相等,檔案名以這個segment中最小的offset命名,檔案擴充名是.log;segment對應的索引的檔案名字一樣,擴充名是.index。

kafka介紹及使用

2、消費消息

訂閱topic是以一個消費組來訂閱的,一個消費組裡面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,隻能被消費組裡的一個消費者消費,但是可以同時被多個消費組消費。是以,如果消費組内的消費者如果比partition多的話,那麼就會有個别消費者一直空閑。

一個消費組消費partition,需要儲存offset記錄消費到哪,以前儲存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這裡zk的性能嚴重影響了消費的速度,而且很容易出現重複消費。

在0.10版本後,kafka把這個offset的儲存,從zk總剝離,儲存在一個名叫__consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理政策是compact。總是保留最新的key,其餘删掉。一般情況下,每個key的offset都是緩存在記憶體中,查詢的時候不用周遊partition,如果沒有緩存,第一次就會周遊partition建立緩存,然後查詢傳回。

kafka介紹及使用

3、kafka中消息具體是怎麼被存儲的

Kafka以Partition作為存儲單元,一個partition是一個有序的,不變的消息隊列,消息總是被追加到尾部。一個partition不能被切分成多個散落在多個broker上或者多個磁盤上。

Partition是由多個Segment組成,當Kafka要寫資料到一個partition時,它會寫入到狀态為active的segment中。如果該segment被寫滿,則一個新的segment将會被建立,然後變成新的"active" segment。Segment以該segment的base offset作為自己的名稱。

kafka介紹及使用

在磁盤上,一個partition就是一個目錄,然後每個segment由一個index檔案和一個log檔案組成。如下:

kafka介紹及使用

Segment下的log檔案就是存儲消息的地方,每個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本号等。在磁盤上的資料格式和producer發送到broker的資料格式一模一樣,也和consumer收到的資料格式一模一樣。由于磁盤格式與consumer以及producer的資料格式一模一樣,這樣就使得Kafka可以通過零拷貝(zero-copy)技術來提高傳輸效率。

Segment下的index負責映射消息offset到某個消息在log檔案中的位置。如下:

kafka介紹及使用

索引檔案是記憶體映射(memory mapped)的,offset查找使用二分查找來查找小于或等于目标offset的最近offset。

索引檔案由8個位元組的條目組成,4個位元組用來存儲相對于base offset的偏移量,另外4個位元組用來存儲position。這個偏移量由于是相對于base offset的,是以隻需要4個位元組來存儲。比如base offset是10000000000000000000,那麼接下來就不用存儲為10000000000000000001 和10000000000000000002了,而是僅存儲為1和2。

Kafka存儲内部檔案工作總結:

• Partition被分成多個segment。

• Segment包含兩個檔案:index和log檔案。

• Index負責映射每個offset到消息的在log檔案中的具體位置,主要用來查找消息。

• Indexes 儲存的是目前segment的base offset的相對偏移量。

• 壓縮消息批量發送是被包裝一個wrapper message來發送。

• 儲存在磁盤上的資料格式和broker從producer收到的以及發送給consumer的資料格式一模一樣,這樣就能夠實作領拷貝(zero-copy)。

摘自cloud.tencent.com/developer/a…

4、kafka為什麼會依賴zookeeper

1、在Kafka的設計中,選擇了使用Zookeeper來進行所有Broker的管理,展現在zookeeper上會有一個專門用來進行Broker伺服器清單記錄的點,節點路徑為/brokers/ids 每個Broker伺服器在啟動時,都會到Zookeeper上進行注冊,即建立/brokers/ids/[0-N]的節點,然後寫入IP,端口等資訊,Broker建立的是臨時節點,所有一旦Broker上線或者下線,對應Broker節點也就被删除了,是以我們可以通過zookeeper上Broker節點的變化來動态表征Broker伺服器的可用性,Kafka的Topic也類似于這種方式。

2、生産者負載均衡。生産者需要将消息合理的發送到分布式Broker上,這就面臨如何進行生産者負載均衡問題。 對于生産者的負載均衡,Kafka支援傳統的4層負載均衡,zookeeper同時也支援zookeeper方式來實作負載均衡。 (1)傳統的4層負載均衡 根據生産者的IP位址和端口來為其定一個相關聯的Broker,通常一個生産者隻會對應單個Broker,隻需要維護單個TCP連結。這樣的方案有很多弊端,因為在系統實際運作過程中,每個生産者生成的消息量,以及每個Broker的消息存儲量都不一樣,那麼會導緻不同的Broker接收到的消息量非常不均勻,而且生産者也無法感覺Broker的新增與删除。 (2)使用zookeeper進行負載均衡很簡單,生産者通過監聽zookeeper上Broker節點感覺Broker,Topic的狀态,變更,來實作動态負載均衡機制,當然這個機制Kafka已經結合zookeeper實作了。

3、消費者的負載均衡和生産負載均衡類似

4、記錄消息分區于消費者的關系,都是通過建立修改zookeeper上相應的節點實作

5、記錄消息消費進度Offset記錄,都是通過建立修改zookeeper上相應的節點實作 。

摘自blog.csdn.net/u011311291/…

更詳細解釋請參考www.jianshu.com/p/a036405f9…

三、spring-boot-kafka對接

maven依賴

<dependency>   
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>   
    <version>2.2.7.RELEASE</version>
</dependency>複制代碼
           

properties配置

spring.kafka.producer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.group-id=kafka
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer複制代碼
           

1、生産消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * kafka生産者
 *
 * @author blackupper
 * @version $Id: KafkaProducer, v0.1
 * @company 
 * @date 2019年08月02日 9:57 AM blackupper Exp $ */
@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String msg){
        log.info("send data:{}, {}", topic, msg);
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
        future.addCallback(
                success -> log.info("KafkaMessageProducer 發送消息成功!"),
                fail -> log.error("KafkaMessageProducer 發送消息失敗!"));
    }
}複制代碼
           
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * 測試controller
 *
 * @author blackupper
 * @version $Id: KafkaSendController, v0.1
 * @company 
 * @date 2019年08月02日 10:02 AM blackupper Exp $
 */
@RestController
@Slf4j
@Api(description = "kafka測試接口")
public class KafkaSendController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @ApiOperation(value = "發送消息")
    @RequestMapping(value = "/send", method = RequestMethod.GET)
    @ResponseBody
    public void queryBalance(
            @ApiParam(value = "topic", name = "topic") @RequestParam(value = "topic") String topic,
            @ApiParam(value = "消息内容", name = "msg") @RequestParam(value = "msg") String msg) {
        kafkaProducer.send(topic, msg);
    }
}
複制代碼
           

2、消費消息

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

/**
 * kafka消費者
 *
 * @author blackupper
 * @version $Id: KafKaConsumer, v0.1
 * @company 
 * @date 2019年08月02日 10:34 AM blackupper Exp $ */
@Component
@Slf4j
public class KafKaConsumer {
    @KafkaListener(id = "kafka", topicPartitions = {@TopicPartition(topic = "test1", partitions = { "0", "1" })})
    public void listen (ConsumerRecord<?, ?> record) {
        log.info("start consume");
        log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
    }
}複制代碼
           
kafka介紹及使用

3、生産者常用調用方式

ListenableFuture<SendResult<K, V>> sendDefault(V data);複制代碼
           

KafkaTemplate中有defaultTopic這個屬性,當調用sendDefault方法時,kafka會自動把消息發送到defaultTopic屬性指定的topic中。

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);複制代碼
           

将消息發送到指定的topic和partition中

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);複制代碼
           

将消息發送到指定的topic和partition中,并在消息上帶上時間戳

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);複制代碼
           

将消息内容封裝成ProducerRecord進行發送

其實上述幾個方法,最終都是分裝成ProducerRecord,調用doSend方法傳遞消息的,我們下面看下doSend方法的源碼:

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(inTransaction(),
                    "No transaction is in process; "
                            + "possible solutions: run the template operation within the scope of a "
                            + "template.executeInTransaction() operation, start a transaction with @Transactional "
                            + "before invoking the template method, "
                            + "run in a transaction started by a listener container when consuming a record");
        }
        final Producer<K, V> producer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        producer.send(producerRecord, buildCallback(producerRecord, producer, future));
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return future;
    }複制代碼
           

從上述代碼可以看到,doSend内部首先判斷是否開啟了事務,然後調用KafkaProducer的send方法發送消息,SettableListenableFuture接收傳回值,SettableListenableFuture實作了ListenableFuture接口,ListenableFuture則實作了Future接口,Future是Java自帶的實作異步程式設計的接口,支援傳回值的異步。由此可見上述的幾個方法都是異步發送消息的。如果想要同步擷取結果,可以調用Future的get方法,該方法會阻塞直到任務傳回結果。

4、@KafkaListener屬性詳解

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

    /**
     * The unique identifier of the container managing for this endpoint.
     * <p>If none is specified an auto-generated one is provided.
     * <p>Note: When provided, this value will override the group id property
     * in the consumer factory configuration, unless {@link #idIsGroup()}
     * is set to false.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the {@code id} for the container managing for this endpoint.
     * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
     */
    String id() default "";

    /**
     * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
     * to use to create the message listener container responsible to serve this endpoint.
     * <p>If not specified, the default container factory is used, if any.
     * @return the container factory bean name.
     */
    String containerFactory() default "";

    /**
     * The topics for this listener.
     * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
     * An expression must be resolved to the topic name.
     * <p>
     * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
     * @return the topic names or expressions (SpEL) to listen to.
     */
    String[] topics() default {};

    /**
     * The topic pattern for this listener. The entries can be 'topic pattern', a
     * 'property-placeholder key' or an 'expression'. The framework will create a
     * container that subscribes to all topics matching the specified pattern to get
     * dynamically assigned partitions. The pattern matching will be performed
     * periodically against topics existing at the time of check. An expression must
     * be resolved to the topic pattern (String or Pattern result types are supported).
     * <p>
     * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
     * @return the topic pattern or expression (SpEL).
     * @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG
     */
    String topicPattern() default "";

    /**
     * The topicPartitions for this listener.
     * <p>
     * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
     * @return the topic names or expressions (SpEL) to listen to.
     */
    TopicPartition[] topicPartitions() default {};

    /**
     * If provided, the listener container for this listener will be added to a bean
     * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
     * This allows, for example, iteration over the collection to start/stop a subset
     * of containers.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the bean name for the group.
     */
    String containerGroup() default "";

    /**
     * Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
     * name to invoke if the listener method throws an exception.
     * @return the error handler.
     * @since 1.3
     */
    String errorHandler() default "";

    /**
     * Override the {@code group.id} property for the consumer factory with this value
     * for this listener only.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the group id.
     * @since 1.3
     */
    String groupId() default "";

    /**
     * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
     * provided) as the {@code group.id} property for the consumer. Set to false, to use
     * the {@code group.id} from the consumer factory.
     * @return false to disable.
     * @since 1.3
     */
    boolean idIsGroup() default true;

    /**
     * When provided, overrides the client id property in the consumer factory
     * configuration. A suffix ('-n') is added for each container instance to ensure
     * uniqueness when concurrency is used.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the client id prefix.
     * @since 2.1.1
     */
    String clientIdPrefix() default "";

    /**
     * A pseudo bean name used in SpEL expressions within this annotation to reference
     * the current bean within which this listener is defined. This allows access to
     * properties and methods within the enclosing bean.
     * Default '__listener'.
     * <p>
     * Example: {@code topics = "#{__listener.topicList}"}.
     * @return the pseudo bean name.
     * @since 2.1.2
     */
    String beanRef() default "__listener";

    /**
     * Override the container factory's {@code concurrency} setting for this listener. May
     * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
     * which case {@link Number#intValue()} is used to obtain the value.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the concurrency.
     * @since 2.2
     */
    String concurrency() default "";

    /**
     * Set to true or false, to override the default setting in the container factory. May
     * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
     * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
     * obtain the value.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return true to auto start, false to not auto start.
     * @since 2.2
     */
    String autoStartup() default "";

    /**
     * Kafka consumer properties; they will supersede any properties with the same name
     * defined in the consumer factory (if the consumer factory supports property overrides).
     * <h3>Supported Syntax</h3>
     * <p>The supported syntax for key-value pairs is the same as the
     * syntax defined for entries in a Java
     * {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
     * <ul>
     * <li>{@code key=value}</li>
     * <li>{@code key:value}</li>
     * <li>{@code key value}</li>
     * </ul>
     * {@code group.id} and {@code client.id} are ignored.
     * @return the properties.
     * @since 2.2.4
     * @see org.apache.kafka.clients.consumer.ConsumerConfig
     * @see #groupId()
     * @see #clientIdPrefix()
     */
    String[] properties() default {};

}複制代碼
           
  • id:代表目前節點的唯一辨別,不配置的話會自動配置設定一個id,主動配置的話,groupId會被設定成id的值(前提是idIsGroup這個屬性值沒有被設定成false)。
  • containerFactory:設定監聽容器工廠類。
  • topics:需要監聽的Topic,可監聽多個。
  • topicsPattern:Topic主題,支援屬性占位符,或者是正規表達式。
  • topicPartitions:可以設定更加詳細的監聽資訊,包括topic、partitions和partitionOffsets。
  • containerGroup:設定了這個屬性,目前的監聽器會被加進設定的這個容器組裡面,後面你可以通過周遊這個集合來啟動或終止一組監聽器集合。
  • errorHandler:異常處理器,如果監聽器處理方法抛出異常,你可以設定一個實作了KafkaListenerErrorHandler的異常處理類來處理抛出的異常。
  • groupId:設定目前消費者組id,支援SpEL表達式{@code #{...}}和屬性占位符{@code ${...}}
  • idIsGroup:id是否能用作groupId
  • clientIdPrefix:clientId字首,字尾會預設加上-n來保證并發時該id的唯一性,支援SpEL表達式{@code #{...}}和屬性占位符{@code ${...}}
  • beanRef:此注解中SpEL表達式中使用的僞bean名,用于指向此監聽器的目前bean,進而允許通路封裝bean中的屬性和方法。
  • concurrency:用于覆寫容器工廠中的并發屬性,支援SpEL表達式{@code #{...}}和屬性占位符{@code ${...}}
  • autoStartup:是否自動啟動
  • properties:消費者屬性,将替換在消費者工廠中定義的具有相同名稱的任何屬性(如果消費者工廠支援屬性覆寫)。

5、ReplyingKafkaTemplate簡介

在分析KafkaTemplate方法的時候,發現其實作的接口類KafkaOperations,還有另外一個實作類ReplyingKafkaTemplate,簡單的描述處理流程就是:生産者通過TopicA發送消息,監聽器A從TopicA中擷取到消息,進行業務處理後将響應内容轉發到TopicB,監聽器B從TopicB擷取消息再次進行處理。

通過分析源碼,發現ReplyingKafkaTemplate是利用了請求響應模式,通過設定ProducerRecord.topic屬性可以設定發送topic,通過設定ProducerRecord.Headers屬性可以設定轉發topic,當然也可以在new ReplyingKafkaTemplate()的時候,在GenericMessageListenerContainer中設定轉發topic。

@Configuration
@EnableKafka
public class ReplyKafkaTemplateConfiguration {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String producer;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String consumer;

    @Bean
    public KafkaMessageListenerContainer<String, String> replyContainer(@Autowired ConsumerFactory consumerFactory) {
        ContainerProperties containerProperties = new ContainerProperties("topic.reply");
        return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(@Autowired ProducerFactory producerFactory, KafkaMessageListenerContainer replyContainer) {
        ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
        template.setReplyTimeout(10000);
        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate());
        return factory;
    }

    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<>(producerFactory());
        return template;
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }


    //消費者配置參數
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        //連接配接位址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumer);
        //GroupID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "replyTest");
        //是否自動送出
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自動送出的頻率
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //Session逾時設定
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //鍵的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        //值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    //生産者配置
    private Map<String, Object> senderProps (){
        Map<String, Object> props = new HashMap<>();
        //連接配接位址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producer);
        //重試,0為不啟用重試機制
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //控制批處理大小,機關為位元組
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //批量發送,延遲為1毫秒,啟用該功能能有效減少生産者發送消息次數,進而提高并發量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //生産者可以使用的總記憶體位元組來緩沖等待發送到伺服器的記錄
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        //鍵的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        //值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}複制代碼
           
@Component
@Slf4j
public class ReplyKafkaTemplateProducer {
    @Autowired
    private ReplyingKafkaTemplate replyingKafkaTemplate;

    public void send() throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic.request", "request message");
        record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "topic.reply".getBytes()));
        RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
        SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
        log.info("send request msg result: " + sendResult.getRecordMetadata());
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("receive reply result: " + consumerRecord.value());
    }
}複制代碼
           
@Component
@Slf4j
public class ReplyKafkaTemplateConsumer {
    @KafkaListener(id = "replyConsumer", topics = "topic.request",containerFactory = "kafkaListenerContainerFactory")
    @SendTo
    public String replyListen(ConsumerRecord<?, ?> record){
        log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
        return "reply message";
    }
}複制代碼
           
kafka介紹及使用