文章目录
-
-
- 一 kafka
-
- 1.1 kafka重要名词解释
-
- 1.1.1 producer:
- 1.1.2. Consumer :
- 1.1.3 kafka cluster:
- 1.1.4 Topic:
- 1.1.5 consumer group:
- 1.1.6 Broker:
- 1.1.7 Partition:
- 1.1.8 Offset:
- 1.2 kafka面试常问问题
-
- 1.2.1 kafka数据存储机制(kafka 是怎么存储数据的)
- 1.2.2 consumer是怎么解决负载均衡的?
- 1.2.3 segment是什么(概念)?
- 1.2.4 数据是怎么分发的(数据的分发策略)?
- 1.2.5 kafka存储数据能做到全局有序么?
- 1.2.6 kafka partition和consumer数目关系
- 1.3 kafka常用命令
- 1.4 kafka 可视化(kafka manager)
-
- 1.4.1 kafka-manager简介
- 1.4.2 安装
-
- 1.4.2.1 环境要求
- 1.4.2.2 下载安装 kafka-manager
- 1.5 kafka API
-
- 1.5.1 创建一个生产者
- 1.5.2 创建一个消费者
- 1.5.3 实现一个自定义分区
- 1.5.4 运行结果
- 二 Streaming
-
- 2.1 Streaming简介
- 2.2 Streaming优缺点
- 2.3 Spark与Storm的对比
- 2.4 DStream
- 2.5 使用Streaming实现Wordcount
-
- 2.5.1 安装和启动netcat服务
-
- 2.5.1.1 netcat服务流程
- 2.5.1.2 netcat服务安装
- 2.5.2 编写和运行程序
- 2.5.3 输出结果
-
一 kafka
1.1 kafka重要名词解释
1.1.1 producer:
- 生产者负责将数据传入kafka, 比如flume, java后台服务, logstash
- 生产者可以有多个, 并且可以同时往一个topic中写数据, 也可以同时往同一个partition中写数据
- 每一个生产者都是一个独立的进程, 而且单个生产者就具有分发数据的能力
- 一个生产者可以同时往多个topic中分发数据
1.1.2. Consumer :
消息消费者,向kafka broker获取消息的客户端
1.1.3 kafka cluster:
- kafka由多个broker组成, 一个broker作为一个实例(节点)
- kafka集群可以保存多种类型的数据, 是由多个topic进行分类的
- 一个topic其实就是一个队列
- 每个topic可以创建一个或多个partition, 并且在存储数据时, 有多副本机制, 保证了数据的安全性. partition的数量是可以更改的, 但是只能增加分区数, 不能减少分区数.
- 每个partition是由多个segment组成的, segment的大小是相同的, 默认是1G
- topic中的数据是有多副本机制的, 原始数据和副本数据不会在同一个节点上
1.1.4 Topic:
作为消息的分类, 我们可以理解为一个队列。
1.1.5 consumer group:
- 消费者负责拉取数据, 比如: Streaming, storm, 自己写的java服务
- 消费者组中可以存在多个consumer, 在streaming中一个consumer作为一个线程
- 新增或减少consumer, 即consumer的数量发生变化都会触发负载均衡, 目的是减少部分broker压力, 提高kafka的吞吐量
- 一个consumer group可以消费多个多个分区的数据
- 一个分区的数据最多在同一时刻被一个consumer消费
- 在同一个consumer group中, 数据不可以重复消费, 如果想要重新消费, 可以调整偏移量, 改变组名, 或者使用映射
1.1.6 Broker:
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
1.1.7 Partition:
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
1.1.8 Offset:
kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
1.2 kafka面试常问问题
1.2.1 kafka数据存储机制(kafka 是怎么存储数据的)
- broker先接收到producer传过来的数据, 将数据写入到操作系统的缓存中(pagecache, pagecache会尽可能的使用空闲内存来存储数据)
- 使用sendFile技术, 尽可能多的减少操作系统和应用程序之间的重复缓存, 写数据时是顺序写入(顺序写入的速度可达到600M/S)
1.2.2 consumer是怎么解决负载均衡的?
- 获取consumer消费的起始分区号
- 计算consumer消费的分区数量
- 用起始分区号的hash值%分区数
1.2.3 segment是什么(概念)?
- 一个分区被分为多个相同大小的segment, 默认是1G
- 每个segment是由多个index和log文件组成的, index存储数据对应的索引, 实际的数据是存储在log文件中
- segment是有生命周期的, 默认是168个小时(7天)
1.2.4 数据是怎么分发的(数据的分发策略)?
- kafka接收到数据后, 会根据创建的topic指定的副本数来存储, 多个副本之间会有选举的过程, 即有leader和follower, 数据会首先写入到leader, 然后同步到follower
- kafka会调用分区器, 来进行分发数据, 默认分区器时DefaultPartitioner(默认的分区方法是key.hashCode%numPartition), 也可以自定义分区器, 需要实现Partitioner特质, 实现partition方法
1.2.5 kafka存储数据能做到全局有序么?
不能, 只能做到分区内有序, 如果想做到topic, 可以将分区设置一个, 但是这样会影响吞吐量
1.2.6 kafka partition和consumer数目关系
- 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。
- 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。
- 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
- 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
1.3 kafka常用命令
- 查看当前服务器中的所有topic
bin/kafka-topics.sh --list --zookeeper hadoop01:2181
- 创建topic
- 删除topic
bin/kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic test
//需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
- 通过shell命令发送消息
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test1
- 通过shell消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1
- 查看消费位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper hadoop01:2181 --group testGroup
- 查看某个Topic的详情
bin/kafka-topics.sh --topic test --describe --zookeeper hadoop01:2181
- 对分区数进行修改
bin/kafka-topics.sh --zookeeper hadoop01 --alter --partitions 15 --topic utopic
1.4 kafka 可视化(kafka manager)
1.4.1 kafka-manager简介
为了简化开发者和服务工程师维护Kafka集群的工作,Yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。
1.4.2 安装
1.4.2.1 环境要求
Kafka 0.8.1.1 或者 0.8.2.* 或者 0.9.0.*
Java 8+
zookeeper 2+
1.4.2.2 下载安装 kafka-manager
- 下载编译
# 下载源码包,上传到linux服务器,地址如下:
https://github.com/yahoo/kafka-manager.git
解压kafka-manager-master.zip
cd kafka-manager-master
# 因为要编译。所以下面这步操作要等很久
sbt clean distcd target/
# 在target/universal目录下我们可以看到 kafka-manager
kafka-manager-1.3.3.17.zip
tips:
使用sbt编译打包的时候时间可能会比较长,如果你hang在
Loading project definition from kafka-manager/project
可以修改project/plugins.sbt中的LogLevel参数
将logLevel := Level.Warn修改为logLevel := Level.Debug
编译完成后的显示结果:
- 解压
unzip kafka-manager-1.3.3.17.zip -d /usr/local
cd /usr/local/kafka-manager-1.3.3.17
- 修改配置 conf/application.properties
# 如果zk是集群,这里填写多个zk地址
kafka-manager.zkhosts="node01:2181,node02:2181,node03:2181"
- 启动
kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
我们用浏览器访问试试,界面非常简洁
1.5 kafka API
1.5.1 创建一个生产者
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
/*
* @Description: 实现一个生产者, 模拟一些数据不断的发送到kafka的topic中, 实现自定义分区器
* ClassName ProducerDemo
* @Author: WCH
* @CreateDate: 2019/1/9$ 11:46$
* @Version: 1.0
*/
object ProducerDemo {
def main(args: Array[String]): Unit = {
// 配置kafka信息
val props = new Properties()
// 配置序列化类型
props.put("serializer.class", "kafka.serializer.StringEncoder")
// 指定kafka集群
props.put("metadata.broker.list", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
// 设置发送数据的相应方式 0, 1, -1
props.put("request.required.acks", "1")
// 设置分区器
props.put("partitioner.class", "day08.kafkaapitest.MyPartitioner")
// 指定topic
val topic = "test"
// 创建producer配置对象
val config = new ProducerConfig(props)
// 创建producer对象
val producer: Producer[String, String] = new Producer(config)
// 模拟数据
for (i <- 1 to 100){
// 要发送的消息
val msg = s"$i: Producer send data"
// 调用send方法来发送消息
producer.send(new KeyedMessage[String,String](topic,msg))
// 避免发送消息过快, 设置一个延迟
Thread.sleep(500)
}
// 如果是实时进行监控的, 则不需要关闭资源
producer.close()
}
}
1.5.2 创建一个消费者
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}
import kafka.consumer._
import kafka.message.MessageAndMetadata
import scala.collection.mutable
/*
* @Description: 创建一个消费者
* ClassName ConsumerDemo
* @Author: WCH
* @CreateDate: 2019/1/9$ 14:30$
* @Version: 1.0
*/
class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable {
override def run(): Unit = {
val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
while (it.hasNext()) {
val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
val topic: String = data.topic
val offset: Long = data.offset
val partition: Int = data.partition
val msgByte: Array[Byte] = data.message()
val msg = new String(msgByte)
println(s"consumer:$consumer,topic:$topic,offset:$offset,partition:$partition,massage:$msg")
}
}
}
object ConsumerDemo {
def main(args: Array[String]): Unit = {
// 定义获取topic
val topic = "test"
// 定义一个Map. 用于存储多个topic的信息, K: topic V: 用于获取topic的线程数, 即consumer的数量
val topics = new mutable.HashMap[String, Int]()
topics.put(topic, 1)
// 配置获取kafka的信息
val prpos = new Properties()
/// 定义group
prpos.put("group.id", "group01")
// 指定zk列表
prpos.put("zookeeper.connect", "hadoop01:2181,hadoop02:2181,hadoop03:2181")
// 指定offset
prpos.put("auto.offset.reset", "smallest")
// 创建配置类, 封装配置信息
val config = new ConsumerConfig(prpos)
// 创建consumer对象
val connsumer: ConsumerConnector = Consumer.create(config)
// 开始消费数据, map中 K: topic的名称, V: topic对应的数据
val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connsumer.createMessageStreams(topics)
// 将topic中的数据拿出来
// KafkaStream 中, K: offset, V: data
val topicStreams: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
// 创建一个固定线程的线程池
val pool: ExecutorService = Executors.newFixedThreadPool(3)
for (i <- 0 until topicStreams.size) {
pool.execute(new ConsumerDemo(s"consumer: $i", topicStreams.get(i)))
}
}
}
1.5.3 实现一个自定义分区
package day08.kafkaapitest
import kafka.producer.Partitioner
import kafka.utils.{Utils, VerifiableProperties}
/*
* @Description: 实现自定义分区
* ClassName MyPartitioner
* @Author: WCH
* @CreateDate: 2019/1/9$ 14:59$
* @Version: 1.0
*/
class MyPartitioner(props: VerifiableProperties = null) extends Partitioner{
override def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
1.5.4 运行结果
二 Streaming
2.1 Streaming简介
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
2.2 Streaming优缺点
优点: 吞吐量大, 开发简单
缺点: 因为是按照批次进行处理, 没有做到真正的实时性
2.3 Spark与Storm的对比
Spark | Storm |
---|---|
开发语言:Scala | 开发语言:Clojure |
编程模型:DStream | 编程模型:Spout/Bolt |
2.4 DStream
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStreams是一系列连续的同类型的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
源代码中关于DStream的介绍:
A DStream internally is characterized by a few basic properties:
- A list of other DStreams that the DStream depends on
- A time interval at which the DStream generates an RDD
-
A function that is used to generate an RDD after each time interval
解释如下:
一个放了多个DStream的列表, 并且DStream之间有依赖关系
每隔一段时间DStream会生成一个RDD, DStream可以放多个同类型的RDD
每隔一段时间生成的RDD都会有一个函数作用在这个RDD上
对数据的操作也是按照RDD为单位来进行的
计算过程由Spark engine来完成
2.5 使用Streaming实现Wordcount
2.5.1 安装和启动netcat服务
2.5.1.1 netcat服务流程
2.5.1.2 netcat服务安装
安装前需要配置好yum源, 然后使用yum命令安装, 如果从官网上直接下载的netcat包需要自己进行编译
yum -y install nc.x86_64
启动netcat, 端口可以自定义, 这里定义的是8888
nc -lk 8888
2.5.2 编写和运行程序
package day08.streaming
import org.apache.hadoop.hdfs.server.common.Storage
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/*
* @Description: 使用Streaming实现Wordcount
* ClassName StreamingWC
* @Author: WCH
* @CreateDate: 2019/1/9$ 16:21$
* @Version: 1.0
*/
object StreamingWC {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("StreamingWC").setMaster("local[2]")
val sc = new SparkContext(conf)
// 创建Streaming的上下文
// 使用类名直接创建
val ssc = new StreamingContext(sc, Durations.seconds(5))
// new StreamingContext(sc,Seconds(5))
// 获取netcat的数据, 这种方式, Streaming会把获取的数据以缓存的方式放到指定缓存级别的地方
val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop02",8888,StorageLevel.MEMORY_AND_DISK)
// 开始分析数据
val res: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
// 输出结果到控制台
res.print()
// 开始提交任务到集群
ssc.start()
// 线程等待, 等待处理下一批次任务
ssc.awaitTermination()
}
}
2.5.3 输出结果
在nc服务下输入字符
在控制台事实显示计算结果