kafka
消息队列内部实现原理
kafka架构
一、下载kafka安装包
二、kafka安装包的解压
三、设置环境变量
四、配置kafka文件
4.1 server.properties
五、启动kafka集群并测试
5.1创建topic
5.2创建broker
5.3创建订阅者
六、kafka常用命令
6.1创建主题(4个分区,2个副本)
6.2查询
6.3发送和消费
6.4平衡leader
6.5kafka自带压测命令
用途:在流式计算中,kafka一般用来缓存数据,storm通过消费kafka的数据进行计算。
kafka是一个分布式消息队列。kafka对消息保存时根据topic进行归类,发送消息者称为producer,消息接受者称为consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
<1>. 点对点模式(类似接受文件,一对一,消费者主动拉取数据,消息收到后消息清除)
-点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
<2>. 发布/订阅模式(类似公众号,一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
producer:**消息生产者,就是向kafka broker发消息的客户端。
consumer:**消息消费者,向kafka broker取 消息的客户端
topic:**可以理解为一个队列。
consumer group(cg):**消费者组,kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的id,即group id。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。
broker:**一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
partition:**为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
offset:**kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
kafka安装前提,需要有java环境以及zookeeper集群。
三台服务器地址:
192.168.11.x1 alary001
192.168.11.x2 alary002
192.168.11.x3 alary003
第一次学习时,还在网上找各种下载地址以及去官网下载,发现各种麻烦和不靠谱,后来慢慢学会,
可以通过国内的各个镜像站进行下载,
清华大学开源软件镜像站——apach下相关软件
找到kafka,下载对应版本即可。
下载好后,将kafka的tar.gz安装包通过rz kafka-xxxx.tar.gz文件传至服务器。
进入到kafka的kakfa-xxx.tar.gz的目录下,解压文件:
可以将kafka-xxx进行更名:
进入 /etc/profile配置环节变量
重启环境变量配置信息:
进入kafka配置文件目录:
通过ll或者ls查看目录下的文件。
首先,我们来修改server.properties这个配置文件
配置内容如下:
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 listeners=plaintext://10.1.32.85:9092 #kafka监听地址 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行i/o处理的线程数 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 log.dirs=/tmp/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 num.partitions=1 #默认的分区数,一个topic默认1个分区数 num.recovery.threads.per.data.dir=1 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 zookeeper.connect=10.1.32.85:2181,10.1.32.193:2181,10.1.32.161:2181 #设置zookeeper的连接端口 zookeeper.connection.timeout.ms=6000 #连接zookeeper的超时时间
之前第一个配置时,遇到一个坑,直接在配置后面加注释,如:
在解析配置文件时,会报错无法识别后面的汉字等内容,所以讲所有注释全部换行 注释,如下:
即可解决问题;
server.propertis配置:
此处,要想直接通过hostname来访问,需要配置host信息。
配置完第一台机器alary001后,通过scp 将第一台机器的kakfa分发给其他机器,并修改相关内容。
启动kafka必须三台都启动,如果是通过xshell进行练习时,可以通过在xshell中鼠标右键,发送键输入到所有会话,即可同时启动kafka。
kafka启动命令:
通过jps命令来检查服务是否启动:
在kafka的发布机器(alary001)创建一个broker:
另外的其他机器上,创建订阅者:
若通过发布者发送的消息,订阅者成功收到,则kafka集群搭建完毕!
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
查询集群描述
bin/kafka-topics.sh --describe --zookeeper
消费者列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --zookeeper localhost:2181 --group test
显示某个消费组的消费详情(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092