天天看点

大数据学习之 Flume + kafka + SparkStreaming

1.搭建Kafka 环境:

可参考

https://blog.csdn.net/weixin_37835915/article/details/103786157

(1)启动zookeeper

(2)启动kafka

(3)创建topic

(4)启动Consumer

2. 搭建Flume 环境:

http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

(1)解压在conf文件夹下面添加example.conf 文件 文件内容如下:

# 定义这个agent中各组件的名字 a1 就是agent得名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 描述和配置source组件:r1

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444


# 描述和配置sink组件:k1

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = flume

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type = snappy


# 描述和配置channel组件,此处使用是内存缓存的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# 描述和配置source channel sink之间的连接关系

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
           

(2)启动Flume flume-ng agent --conf ../conf --conf-file ../conf/example.conf --name a1 -property flume.root.logger=INFO,console

flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"

出现以下说明成功 Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

(3)启动letnet localhost 44444 输入 hellow word 在Flume中能够接收说明成功

3 编码代码:

package com.spark.self


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.DStream

object WordCountSprakStreaming {
  val numThreads = 1
  //  val topics = "test"
  //  val topics = "sparkStreamingTest"
  val topics = "flume"
  val zkQuorum = "localhost:2181"
  val group = "consumer1"
  val brokers = "localhost:9092"

  def main(args: Array[String]): Unit = {
    //    receiver
    direct
  }

  def service(): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //用checkoint来存储offset
    ssc.checkpoint("/out")
    //创建kafka对象   生产者 和消费者
    //模式1 采取的是 receiver 方式  reciver 每次只能读取一条记录
    val topic = Map("test" -> 1)
    //直接读取的方式  由于kafka 是分布式消息系统需要依赖Zookeeper
    val data = KafkaUtils.createStream(ssc, "localhost:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
    //数据累计计算
    val updateFunc = (curVal: Seq[Int], preVal: Option[Int]) => {
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的返回值
      Some(total + previous)
    }
    val result = data.map(_._2).flatMap(_.split(" ")).map(word => (word, 1)).updateStateByKey(updateFunc).print()
    //启动ssc
    ssc.start()
    ssc.awaitTermination()
  }

  def receiver() = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("kafka test").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10));
    ssc.checkpoint("/out")
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val updateFunc = (curVal: Seq[Int], preVal: Option[Int]) => {
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的返回值
      Some(total + previous)
    }
    val words = lines.flatMap(_.split(" ")).map(x => (x, 1))
    words.reduceByKey(_ + _).updateStateByKey(updateFunc).print()
    ssc.start()
    ssc.awaitTermination()
  }


  def direct() = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setMaster("local[2]").setAppName("kafka test")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.checkpoint("/out")
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" ")).map(x => (x, 1))
    val updateFunc = (curVal: Seq[Int], preVal: Option[Int]) => {
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的返回值
      Some(total + previous)
    }
    words.reduceByKey(_ + _).updateStateByKey(updateFunc).print()
    ssc.start()
    ssc.awaitTermination()
  }

}

           

Flume 的相关说明和配置请参考

https://blog.csdn.net/weixin_37835915/article/details/103184553