天天看点

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

文章目录

      • 一 kafka
        • 1.1 kafka重要名词解释
          • 1.1.1 producer:
          • 1.1.2. Consumer :
          • 1.1.3 kafka cluster:
          • 1.1.4 Topic:
          • 1.1.5 consumer group:
          • 1.1.6 Broker:
          • 1.1.7 Partition:
          • 1.1.8 Offset:
        • 1.2 kafka面试常问问题
          • 1.2.1 kafka数据存储机制(kafka 是怎么存储数据的)
          • 1.2.2 consumer是怎么解决负载均衡的?
          • 1.2.3 segment是什么(概念)?
          • 1.2.4 数据是怎么分发的(数据的分发策略)?
          • 1.2.5 kafka存储数据能做到全局有序么?
          • 1.2.6 kafka partition和consumer数目关系
        • 1.3 kafka常用命令
        • 1.4 kafka 可视化(kafka manager)
          • 1.4.1 kafka-manager简介
          • 1.4.2 安装
            • 1.4.2.1 环境要求
            • 1.4.2.2 下载安装 kafka-manager
        • 1.5 kafka API
          • 1.5.1 创建一个生产者
          • 1.5.2 创建一个消费者
          • 1.5.3 实现一个自定义分区
          • 1.5.4 运行结果
      • 二 Streaming
        • 2.1 Streaming简介
        • 2.2 Streaming优缺点
        • 2.3 Spark与Storm的对比
        • 2.4 DStream
        • 2.5 使用Streaming实现Wordcount
          • 2.5.1 安装和启动netcat服务
            • 2.5.1.1 netcat服务流程
            • 2.5.1.2 netcat服务安装
          • 2.5.2 编写和运行程序
          • 2.5.3 输出结果

一 kafka

1.1 kafka重要名词解释

1.1.1 producer:
  • 生产者负责将数据传入kafka, 比如flume, java后台服务, logstash
  • 生产者可以有多个, 并且可以同时往一个topic中写数据, 也可以同时往同一个partition中写数据
  • 每一个生产者都是一个独立的进程, 而且单个生产者就具有分发数据的能力
  • 一个生产者可以同时往多个topic中分发数据
1.1.2. Consumer :

消息消费者,向kafka broker获取消息的客户端

1.1.3 kafka cluster:
  • kafka由多个broker组成, 一个broker作为一个实例(节点)
  • kafka集群可以保存多种类型的数据, 是由多个topic进行分类的
  • 一个topic其实就是一个队列
  • 每个topic可以创建一个或多个partition, 并且在存储数据时, 有多副本机制, 保证了数据的安全性. partition的数量是可以更改的, 但是只能增加分区数, 不能减少分区数.
  • 每个partition是由多个segment组成的, segment的大小是相同的, 默认是1G
  • topic中的数据是有多副本机制的, 原始数据和副本数据不会在同一个节点上
1.1.4 Topic:

作为消息的分类, 我们可以理解为一个队列。

1.1.5 consumer group:
  • 消费者负责拉取数据, 比如: Streaming, storm, 自己写的java服务
  • 消费者组中可以存在多个consumer, 在streaming中一个consumer作为一个线程
  • 新增或减少consumer, 即consumer的数量发生变化都会触发负载均衡, 目的是减少部分broker压力, 提高kafka的吞吐量
  • 一个consumer group可以消费多个多个分区的数据
  • 一个分区的数据最多在同一时刻被一个consumer消费
  • 在同一个consumer group中, 数据不可以重复消费, 如果想要重新消费, 可以调整偏移量, 改变组名, 或者使用映射
1.1.6 Broker:

一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

1.1.7 Partition:

为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

1.1.8 Offset:

kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

1.2 kafka面试常问问题

1.2.1 kafka数据存储机制(kafka 是怎么存储数据的)
  1. broker先接收到producer传过来的数据, 将数据写入到操作系统的缓存中(pagecache, pagecache会尽可能的使用空闲内存来存储数据)
  2. 使用sendFile技术, 尽可能多的减少操作系统和应用程序之间的重复缓存, 写数据时是顺序写入(顺序写入的速度可达到600M/S)
1.2.2 consumer是怎么解决负载均衡的?
  1. 获取consumer消费的起始分区号
  2. 计算consumer消费的分区数量
  3. 用起始分区号的hash值%分区数
1.2.3 segment是什么(概念)?
  1. 一个分区被分为多个相同大小的segment, 默认是1G
  2. 每个segment是由多个index和log文件组成的, index存储数据对应的索引, 实际的数据是存储在log文件中
  3. segment是有生命周期的, 默认是168个小时(7天)
1.2.4 数据是怎么分发的(数据的分发策略)?
  1. kafka接收到数据后, 会根据创建的topic指定的副本数来存储, 多个副本之间会有选举的过程, 即有leader和follower, 数据会首先写入到leader, 然后同步到follower
  2. kafka会调用分区器, 来进行分发数据, 默认分区器时DefaultPartitioner(默认的分区方法是key.hashCode%numPartition), 也可以自定义分区器, 需要实现Partitioner特质, 实现partition方法
1.2.5 kafka存储数据能做到全局有序么?

不能, 只能做到分区内有序, 如果想做到topic, 可以将分区设置一个, 但是这样会影响吞吐量

1.2.6 kafka partition和consumer数目关系
  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

1.3 kafka常用命令

  • 查看当前服务器中的所有topic
bin/kafka-topics.sh --list --zookeeper hadoop01:2181
           
  • 创建topic
  • 删除topic
bin/kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic test
//需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
           
  • 通过shell命令发送消息
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test1
           
  • 通过shell消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1
           
  • 查看消费位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper hadoop01:2181 --group testGroup
           
  • 查看某个Topic的详情
bin/kafka-topics.sh --topic test --describe --zookeeper hadoop01:2181
           
  • 对分区数进行修改
bin/kafka-topics.sh --zookeeper  hadoop01 --alter --partitions 15 --topic utopic
           

1.4 kafka 可视化(kafka manager)

1.4.1 kafka-manager简介

为了简化开发者和服务工程师维护Kafka集群的工作,Yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。

1.4.2 安装

1.4.2.1 环境要求

Kafka 0.8.1.1 或者 0.8.2.* 或者 0.9.0.*
Java 8+
zookeeper 2+
           

1.4.2.2 下载安装 kafka-manager

  • 下载编译
# 下载源码包,上传到linux服务器,地址如下:
https://github.com/yahoo/kafka-manager.git
解压kafka-manager-master.zip
cd kafka-manager-master
# 因为要编译。所以下面这步操作要等很久
sbt clean distcd target/ 
# 在target/universal目录下我们可以看到 kafka-manager
kafka-manager-1.3.3.17.zip
           

tips:

使用sbt编译打包的时候时间可能会比较长,如果你hang在

Loading project definition from kafka-manager/project

可以修改project/plugins.sbt中的LogLevel参数

将logLevel := Level.Warn修改为logLevel := Level.Debug

编译完成后的显示结果:

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount
  • 解压
unzip kafka-manager-1.3.3.17.zip -d /usr/local
cd /usr/local/kafka-manager-1.3.3.17
           
  • 修改配置 conf/application.properties
# 如果zk是集群,这里填写多个zk地址
kafka-manager.zkhosts="node01:2181,node02:2181,node03:2181"
           
  • 启动

kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

我们用浏览器访问试试,界面非常简洁

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

1.5 kafka API

1.5.1 创建一个生产者
import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

/*
 * @Description:    实现一个生产者, 模拟一些数据不断的发送到kafka的topic中, 实现自定义分区器
 * ClassName        ProducerDemo
 * @Author:         WCH
 * @CreateDate:     2019/1/9$ 11:46$
 * @Version:        1.0
*/
object ProducerDemo {
  def main(args: Array[String]): Unit = {
    // 配置kafka信息
    val props = new Properties()
    // 配置序列化类型
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    // 指定kafka集群
    props.put("metadata.broker.list", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
    // 设置发送数据的相应方式 0, 1, -1
    props.put("request.required.acks", "1")
    // 设置分区器
    props.put("partitioner.class", "day08.kafkaapitest.MyPartitioner")
    // 指定topic
    val topic = "test"

    // 创建producer配置对象
    val config = new ProducerConfig(props)
    // 创建producer对象
    val producer: Producer[String, String] = new Producer(config)
    // 模拟数据
    for (i <- 1 to 100){
      // 要发送的消息
      val msg = s"$i: Producer send data"
      // 调用send方法来发送消息
      producer.send(new KeyedMessage[String,String](topic,msg))
      // 避免发送消息过快, 设置一个延迟
      Thread.sleep(500)
    }
    // 如果是实时进行监控的, 则不需要关闭资源
    producer.close()
  }
}

           
1.5.2 创建一个消费者
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}

import kafka.consumer._
import kafka.message.MessageAndMetadata

import scala.collection.mutable

/*
 * @Description:    创建一个消费者
 * ClassName        ConsumerDemo
 * @Author:         WCH
 * @CreateDate:     2019/1/9$ 14:30$
 * @Version:        1.0
*/
class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable {
  override def run(): Unit = {
    val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
    while (it.hasNext()) {
      val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
      val topic: String = data.topic
      val offset: Long = data.offset
      val partition: Int = data.partition
      val msgByte: Array[Byte] = data.message()
      val msg = new String(msgByte)
      println(s"consumer:$consumer,topic:$topic,offset:$offset,partition:$partition,massage:$msg")
    }
  }
}

object ConsumerDemo {
  def main(args: Array[String]): Unit = {
    // 定义获取topic
    val topic = "test"
    // 定义一个Map. 用于存储多个topic的信息, K: topic  V: 用于获取topic的线程数, 即consumer的数量
    val topics = new mutable.HashMap[String, Int]()

    topics.put(topic, 1)
    // 配置获取kafka的信息
    val prpos = new Properties()
    /// 定义group
    prpos.put("group.id", "group01")
    // 指定zk列表
    prpos.put("zookeeper.connect", "hadoop01:2181,hadoop02:2181,hadoop03:2181")
    // 指定offset
    prpos.put("auto.offset.reset", "smallest")

    // 创建配置类, 封装配置信息
    val config = new ConsumerConfig(prpos)
    // 创建consumer对象
    val connsumer: ConsumerConnector = Consumer.create(config)

    // 开始消费数据, map中 K: topic的名称, V: topic对应的数据
    val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connsumer.createMessageStreams(topics)
    // 将topic中的数据拿出来
    // KafkaStream 中, K: offset, V: data
    val topicStreams: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
    // 创建一个固定线程的线程池
    val pool: ExecutorService = Executors.newFixedThreadPool(3)
    for (i <- 0 until topicStreams.size) {
      pool.execute(new ConsumerDemo(s"consumer: $i", topicStreams.get(i)))
    }


  }
}

           
1.5.3 实现一个自定义分区
package day08.kafkaapitest

import kafka.producer.Partitioner
import kafka.utils.{Utils, VerifiableProperties}

/*
 * @Description:    实现自定义分区
 * ClassName        MyPartitioner
 * @Author:         WCH
 * @CreateDate:     2019/1/9$ 14:59$
 * @Version:        1.0
*/
class MyPartitioner(props: VerifiableProperties = null) extends Partitioner{
  override def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

           
1.5.4 运行结果
spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

二 Streaming

2.1 Streaming简介

Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

2.2 Streaming优缺点

优点: 吞吐量大, 开发简单

缺点: 因为是按照批次进行处理, 没有做到真正的实时性

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

2.3 Spark与Storm的对比

Spark Storm
spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount
spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount
开发语言:Scala 开发语言:Clojure
编程模型:DStream 编程模型:Spout/Bolt
spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount
spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

2.4 DStream

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStreams是一系列连续的同类型的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

源代码中关于DStream的介绍:

A DStream internally is characterized by a few basic properties:

  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval

    解释如下:

    一个放了多个DStream的列表, 并且DStream之间有依赖关系

    每隔一段时间DStream会生成一个RDD, DStream可以放多个同类型的RDD

    每隔一段时间生成的RDD都会有一个函数作用在这个RDD上

    spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

对数据的操作也是按照RDD为单位来进行的

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

计算过程由Spark engine来完成

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

2.5 使用Streaming实现Wordcount

2.5.1 安装和启动netcat服务

2.5.1.1 netcat服务流程

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

2.5.1.2 netcat服务安装

安装前需要配置好yum源, 然后使用yum命令安装, 如果从官网上直接下载的netcat包需要自己进行编译

yum -y install nc.x86_64
           

启动netcat, 端口可以自定义, 这里定义的是8888

nc -lk 8888
           
2.5.2 编写和运行程序
package day08.streaming

import org.apache.hadoop.hdfs.server.common.Storage
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
 * @Description:    使用Streaming实现Wordcount
 * ClassName        StreamingWC
 * @Author:         WCH
 * @CreateDate:     2019/1/9$ 16:21$
 * @Version:        1.0
*/
object StreamingWC {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("StreamingWC").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 创建Streaming的上下文
    // 使用类名直接创建
    val ssc = new StreamingContext(sc, Durations.seconds(5))
    // new StreamingContext(sc,Seconds(5))

    // 获取netcat的数据, 这种方式, Streaming会把获取的数据以缓存的方式放到指定缓存级别的地方
    val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop02",8888,StorageLevel.MEMORY_AND_DISK)

    // 开始分析数据
    val res: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    // 输出结果到控制台
    res.print()

    // 开始提交任务到集群
    ssc.start()
    // 线程等待, 等待处理下一批次任务
    ssc.awaitTermination()

  }
}
           
2.5.3 输出结果

在nc服务下输入字符

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

在控制台事实显示计算结果

spark08--kafka组件,面试题,常用命令,可视化,api,Streaming简介,DStream,nc服务,Streaming实现Wordcount

继续阅读