天天看点

KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

文章目录

  • Kafka介绍
    • 消息队列的作用
    • 主题和日志
      • 发布消息
      • 读取消息
    • 分配
    • 生产者
    • 消费者
  • Kafka架构
  • Kafka安装部署
  • Kafke命令行操作
  • Kafka Api操作
  • Kafka优化
  • 总结
  • 参考资料

Kafka介绍

官网:http://kafka.apache.org/

ApacheKafka®是一个分布式流媒体平台,用于构建数据管道和流应用程序。它具有水平可扩展性,容错性,快速性,并在数千家公司中的生产中运行。

流媒体平台有三个关键功能:

发布和订阅:读取和写入数据流,类似于消息队列或企业消息传递系统。

处理:编写可扩展的处理应用程序、实时响应事件。

存储:将数据流安全地存储在分布式、副本的容错集群中。

Kafka通常用于两大类应用:

构建可在系统或应用程序之间可靠获取数据的实时流数据管道

构建转换或响应数据流的实时流应用程序

Kafka有四个核心API:

  1. 生产者(Producer)API:允许应用程序发布的记录流至一个或多个Kafka的话题。
  2. 消费者(Consumer)API:允许应用程序订阅一个或多个主题,并处理所产生的对他们记录的数据流。
  3. 流(Streams)API:允许应用程序充当流处理器,从一个或多个主题消耗的输入流,并产生一个输出流至一个或多个输出的主题,有效地变换所述输入流,以输出流。
  4. 连接器(Connector)API:允许构建和运行Kafka主题连接到现有的应用程序或数据系统中重用生产者或消费者。例如,关系数据库的连接器可能捕获对表的每个更改。
KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

  在流计算中,kafka主要功能是用来缓存数据,storm可以通过消费kafka中的数据进行流计算。

是一套开源的消息系统,由scala写成。支持javaAPI的。

  kafka最初由LinkedIn公司开发,2011年开源。

  2012年从Apache毕业。

  是一个分布式消息队列,kafka读消息保存采用Topic进行归类。分布式消息队列有两个重要的角色:发送消息的Producer(生产者)和接收消息的Consumer(消费者)。

消息队列的作用

  1. 解耦

    使用消息队列作为传输存储消息的通道,可以避免因一方出问题后数据丢失的安全问题。如图所示:

    KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

      上图中显示了无消息队列和有消息队列的消息流程图。其中,如果无消息队列,当发送消息方发送消息时,如果接收消息方出现了问题,此时这一段消息将会丢失或发送消息方重试多次发送。

      而加入了消息队列后,发送消息方和接收消息方不再直接相连,接收消息方宕机后,消息会存储到消息队列的内存或磁盘中,直到接收消息方重新启动后会从消息队列获取消息,而发送消息方在发送完消息之后不需要关心接收消息方是否接收到消息。

      消息队列就是传输和存储消息的通道。

  2. 拓展性

      上图中只是展示了一个发送消息方和接收消息方,在分布式消息队列中,就是生产者和消费者。在实际应用中,一个生产者发送的消息会由多个消费者消费,而一个消费者可以消费多个生产者的消息,加入消息队列,对消息的传输进入统一的管理,可以便于新增生产者和消费者,同时不影响其他系统的工作。

    KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料
      上图中也清楚显示了消息队列的作用。在Kafka中,topic是用于消息队列的实现,生产者和消费者通过注册和订阅topic,实现发送和接收消息的功能。当需要新增一个生产者或消费者时,只需要注册或订阅对应的topic就可以实现发送和接收消息的功能。
  3. 灵活

      面对访问量剧增的情况下,分布式消息队列可以把访问量均衡分配到多个消息队列中,当所有消息队列都满了后,会拒绝访问,避免系统完全瘫痪。

  4. 可恢复性

      当分布式消息队列部分组件失效,不会影响到整个系统,可以随时恢复过来。

  5. 缓冲性

       消息统一由消息队列中间件管理,可以控制数据流经过系统的速度,避免系统负载过重,出现问题,实现服务治理的效果。

  6. 顺序保证性

      消息队列遵循先入先出原则,保证了数据的顺序。

  7. 异步通信

      kafka消息队列提供了异步处理的机制,允许用户把消息放到队列中,而不立即处理。

主题和日志

Kafka记录流的核心抽象是Topic——主题,主题是发布消息的类别或订阅源名称。上图中显示,一个主题一般都是多用户的,多个生产者或多个消费者写入或读取它的数据。

发布消息

对于每个主题,Kafka集群都维护一个分区日志,如下图:

KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都被分配一个称为偏移的顺序ID号,它唯一地标识分区中的每个记录。即Kafka是用分区日志来实现消息队列数据结构的。

Kafka集群会持久化地保留所有已发布的消息,即不管它们是否已被消耗,Kafka都会保存它们在日志分区中。持久化消息有三个配置方式:

(1)消息达到默认10000条时将数据写入到日志文件。

(2)当达到某个时间时,强制执行一次flush,默认值为null。

(3)周期性检查消息是否需要flush。

另外,配置数据保存策略,可以将日志文件删除。保存策略有两种方式:

(1)按时间粒度,可设置分钟或小时,达到一定时间就处理,默认是7天。

(2)按文件大小,设置最大文件大小,达到上限就删除。同时可以设置文件大小检查的周期。

当达到以上保存策略的其中一个条件时,会把日志文件作一个标记"delete",当这些标记了"delete"的文件达到一定大小或达到一定时间后才会真正删除。详情请查看官方文档。

读取消息

每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置,该偏移可以由消费者控制,默认在读取消息时会线性提高其偏移量。消费者可以控制偏移,即消费者可以重复消费旧的消息或直接跳到最新的消息处,且对集群或其他消费者没有太大影响。

KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

分配

日志分区可以分布在多个Kafka集群中的服务器上,而一个主题可以有多个日志分区,因此,它可以处理任意数量的消息。

每个服务器处理数据并请求分区的共享。每个分区都在可配置数量的服务器上进行复制,以实现容错。

每个分区都有一个服务器充当“Leader”,零个或多个服务器充当“Follower”,Leader负责处理分区的所有读取和写入请求,而分区的Follower则被动地复制Leader,如果Leader宕机了,其中一个Follower会自动成为新的Leader。每个服务器都充当某些分区的Leader服务器和其他分区Leader服务器的Follower。

生产者

生产者将消息发布到它指定的主题,并负责选择把记录分配到主题中的哪个分区。可自定义设置分区信息。

消费者

消费者使用消费者组来标记自己,并且发布到主题的每个记录会被传递到订阅了该主题的消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器上。

如果消费者实例在相同的消费者组中,则会均衡负载到消费者实例上。

如果消费者实例在不同的消费者组中,则会广播到所有消费者进程中。

Kafka架构

kafka依赖zookeeper,用zk保存元数据信息。搭建kafka集群需要先搭建zookeeper集群。下面首先以微信公众号的消息发送接收为例,展示一下Kafka的整体流程。

KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

上图中后台系统充当Producer,用户客户端充当Consumer,而Kafka集群充当消息队列的角色。

下面看看细节实现:

KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料
  1. 生产者把消息发布到指定主题的某个分区中;
  2. kafka把主题中的消息传递给订阅了该主题的消费者组;
  3. 默认情况下,消费者组会把所订阅主题的分区均衡分配给消费者实例来处理,当传递过来的消息是分区Partition中的,则把它传递给负责此分区的消费者实例处理。

Kafka安装部署

上面对Kafka进行了详细的说明,下面开始使用Kafka。

  1. 官网下载安装包:http://kafka.apache.org/downloads,本文使用的是2.11版本。
  2. 上传到服务器并解压文件
tar -zxvf .tar
           
  1. 修改配置文件config/server.properties
broker.id=0  #集群中的每个服务器的id不同
delete.topic.enable=true #是否允许删除主题
log.dirs=/kafka/logs #指定日志保存的位置,需要事先创建指定的目录
zookeeper.connect=hd-even-01:2181,hd-even-02:2181,hd-even-03:2181  #指定zookeeper集群
           
  1. 启动zk集群
zkServer.sh start
           
  1. 启动kafka集群
# &代表是后台启动
bin/kafka-server-start.sh config/server.properties &
           
  1. 关闭的命令
bin/kafka-server-stop.sh
           

Kafke命令行操作

1. 查看当前集群已存在的主题
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --list
2. 创建topic
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --create --replication-factor 3 --partitions 1 --topic test

--zookeeper 连接zk集群
--create 创建命令
--replication-factor 副本数
--partition 分区数
--topic 主题名
3. 删除主题
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --delete --topic test
4. 发送消息
	生产者启动(9092是Kafka对外开放的接口):
	bin/kafka-console-producer.sh --broker-list hd-even-01:9092 --topic test
	消费者启动:
	bin/kafka-console-consumer.sh --bootstrap-server hd-even-01:9092 --topic test --from-beginning
	--bootstrap-server 指定kafka集群
	--from-beginning 表示消费者偏移为第一个位置
5. 查看主题详细信息
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --describe --topic test
           

Kafka Api操作

除了通过命令行的方式操作Kafka,还可以使用Java Api来操作Kafka,以便与企业项目集成。

  1. pom.xml导入依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.0.0</version>
</dependency>
           
  1. 生产者Producer:
package com.even.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class Producer {
    public static void main(String[] args) {

        //1.配置生产者属性(指定多个参数)
        Properties prop = new Properties();

        //参数配置
        //kafka节点的地址
        prop.put("bootstrap.servers", "192.168.11.136:9092");
        //发送消息是否等待应答
        prop.put("acks", "all");
        //配置发送消息失败重试
        prop.put("retries", "0");
        //配置批量处理消息大小
        prop.put("batch.size", "10241");
        //配置批量处理数据延迟
        prop.put("linger.ms", "5");
        //配置内存缓冲大小
        prop.put("buffer.memory", "12341235");
        //消息在发送前必须序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.实例化producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        //3.发送消息
        for (int i = 0; i < 10000; i++) {
            producer.send(new ProducerRecord<String, String>("test", "even" + i));
        }
        //4.释放资源
        producer.close();
    }
}

           
  1. 消费者Consumer:
package com.even.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class Consumer {
    public static void main(String[] args) {
        //1.配置消费者属性
        Properties prop = new Properties();
        //配置属性
        //服务器地址指定
        prop.put("bootstrap.servers", "192.168.11.136:9092");
        //配置消费者组
        prop.put("group.id", "g1");
        //配置是否自动确认offset
        prop.put("enable.auto.commit", "true");
//        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        //序列化
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2.实例消费者
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

        //订阅消息主题
        consumer.subscribe(Collections.singletonList("test"));
        //3.拉消息 推push 拉poll
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //遍历消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + "------" + record.value());
            }

        }
    }
}
           
  1. 先启动Producer,再启动Consumer,结果打印:
    KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料

Kafka优化

Kafka可以认为是一个内存数据库,消息保存在内存中,定期持久化到硬盘。因此,对Kafka内存的合理使用是Kafka优化的关键。下面列出几点常用优化:

  1. 增加Kafka堆内存大小,修改kafka-server-start.sh文件
    KafkaKafka介绍Kafka架构Kafka安装部署Kafke命令行操作Kafka Api操作Kafka优化总结参考资料
    堆内存默认是1G,可以适当调整。
  2. 设置持久化数据的策略

    上面提到,Kafka通过一定的方式,会把内存的数据持久化到硬盘,这三种方式分别为:

    (1)消息达到默认10000条时将数据写入到日志文件。

    (2)当达到某个时间时,强制执行一次flush,默认值为null。

    (3)周期性检查消息是否需要flush。

  3. 限流

    对于生产者发布的消息进行限流。

  4. 集群模式

    把Kafka配置成集群模式,当一个服务器的内存快要满时,可以把数据均衡分配到另一台服务器,然后再把数据持久化到本地硬盘,避免内存溢出。

总结

Kafka有几个重点要理解的概念:生产者Producer,消费者Consumer,主题Topic,分区Partition,消费者组ConsumerGrouping。生产者负责发布消息,Topic是一个抽象概念,包含多个分区Partition,分区Partition是具体存储消息的地方,消费者组ConsumerGrouping从订阅的主题获取消息并传递给消费者Consumer。

参考资料

Kafka官方文档:http://kafka.apache.org/