1.下载Kafka
2.启动服务端
Kafka需要用到ZooKeepr,所以需要先启动一个ZooKeepr服务端,如果没有单独的ZooKeeper服务端,可以使用Kafka自带的脚本快速启动一个单节点ZooKeepr实例
启动Kafka服务端实例
3.创建一个Kafka Topic
创建一个名为test的topic,这个topic只有一个分区和一个副本
创建以后就可以查看了
另外,除了手动创建topics,可以配置Brokers自动创建topics
4.发送一些消息
Kafka带有一个命令行工具kafka-console-producer.sh,可以从一个文件或者标准输入读入数据然后以消息的方式发送到Kafka集群。默认每行将被单独作为消息发送。
5.启动一个消费者
Kafka也自带一个命令行工具用以消费Kafka集群的消息
producer和consumer分别开两个Linu终端,producer端输入一些内容,就可以看到consumer端会实时显示producer输入的内容
6.建立一个multi-broker集群
以上我们建立运行了单个Kafka broker,但是对于Kafka,单个broker只是Kafka集群的的一个成员,下面我们将扩展Kafka集群到三个broker实例
主要修改几个参数
broker.id
listeners
log.dirs
broker.id是一个Kafka broker实例在集群中的唯一属于,每个实例需要不同。
现在创建一个具有三个复制成员的topic
查看集群中每个broker的状态
先列出所有分区的概要信息,一个分区显示一行
Leader 负责给定分区的所有读写操作,每个集群节点会是所有分区中随机选取的分区的leader
Replicas 列出当前分区的复制节点,不管这些节点是否是Leader甚至是否当前存活
Isr in-sync复制集的子集,列出当前存活并且赶上leader的集群节点
向新的topic发送一些消息
输入一些消息
再开一个终端消费这些消息
可以看到输入的消息很快就别消费掉了
现在测试搭建的Kafka Cluster的容错能力
Leader:0 表示Broker 0 是leader,Broker 1和Broker 2是replica
现在我们把Broker 0 给kill 掉
再查看集群节点状态
可以看到Leader已经切换到Broker 2了,Broker 0已经不在Isr这个子集中了,Replicas还是三个成员
再次执行消费者
可以看到之前生产者发送的消息依然可以被消费,虽然负责消息写入的leader已经挂掉了
7.使用Kfaka Connect导入导出数据
从Linux终端读入数据然后将数据输出到终端方便了解Kafka,接下来使用Kafka从其他数据来源导入数据,然后导出数据到其他系统中去。对于很多系统来说,可以使用Kafka Connect来导入导出数据来取代重新编写一些自定义代码。Kafka Connect是一个Kafka自带的工具用于导入导出数据。
先向一个文件中写入一些数据
然后,启动两个Kafka Connector以standalone模式运行
connect-standalone.properties
connect-file-source.properties
connect-file-sink.properties
一旦启动Kafka Connect进程,source connector会从test.txt中读入数据,然后将这些数据推送给connect-test这个topic,sink connector会从这个topic读出数据并写入数据到test.sink.txt文件
消息是存储在connect-test这个topic中的
connector的终端先不要断开,继续向test.txt文件中追加几行,看看console-consumer是否会有刷新
可以看到test.sink.txt这个文件也多了一行
8.Use Kafka Streams to process data
Kafka Streams是Kafka用于实时流数据处理和分析Kafka brokers中存储的数据的客户端库。
输出
参考文档:
http://kafka.apache.org/documentation.html#quickstart