天天看点

spark项目实战——Flume->Kafka->SparkStreaming->Kafka项目基本需求:文件格式Flume监控文件读入到Kafka中SparkStreaming读取Kafka数据并进行简单的处理(数据扁平化:一个用户对应一个朋友),再写入到Kafka中

项目基本需求:

利用Flume监控文件,将文件读取到Kafka中,再从Kafka中读入到SparkStreaming,在SparkStreaming中经过简单的处理后再写入到Kafka中。

文件格式

csv文件格式如下,user的朋友关系一对多,将朋友关系展开实现一对一的表格

spark项目实战——Flume->Kafka->SparkStreaming->Kafka项目基本需求:文件格式Flume监控文件读入到Kafka中SparkStreaming读取Kafka数据并进行简单的处理(数据扁平化:一个用户对应一个朋友),再写入到Kafka中

Flume监控文件读入到Kafka中

a6.sources=s6
a6.channels=c6
a6.sinks=k6

a6.sources.s6.type=spooldir
a6.sources.s6.spoolDir=/opt/data
a6.sources.s6.interceptors=head_filter
a6.sources.s6.interceptors.head_filter.type=regex_filter
a6.sources.s6.interceptors.head_filter.regex=^user.*
a6.sources.s6.interceptors.head_filter.excludeEvents=true
//由于一行的数据过多,Flume默认的行最大长度为2048,设置此项配置增加最大长度
a6.sources.s6.deserializer.maxLineLength=60000

a6.channels.c6.type=memory
a6.channels.c6.capacity=10000
a6.channels.c6.transactionCapacity=1000

a6.sinks.k6.type=org.apache.flume.sink.kafka.KafkaSink
a6.sinks.k6.kafka.bootstrap.servers=192.168.56.101:9092
a6.sinks.k6.kafka.topic=userpro


a6.sinks.k6.channel=c6
a6.sources.s6.channels=c6
           

SparkStreaming读取Kafka数据并进行简单的处理(数据扁平化:一个用户对应一个朋友),再写入到Kafka中

//创建对象产生一个链接
class KafkaSinks[k,v](fc:()=>KafkaProducer[k,v]) extends Serializable {
  lazy val producer = fc()

  def send(topic:String,key:k,value:v) = {
    producer.send(new ProducerRecord[k,v](topic,key,value))
  }

  def send(topic:String,value:v)={
    producer.send(new ProducerRecord[k,v](topic,value))
  }
}

object KafkaSinks{
  import scala.collection.JavaConversions._
  def apply[k,v](conf:Map[String,String]): KafkaSinks[k,v] = {
    var func = ()=>{
      val prod = new KafkaProducer[k,v](conf)
      sys.addShutdownHook{
        prod.close()
      }
      prod
    }
    new KafkaSinks[k,v](func)
  }
}
           
//单例上锁
object MySingleBaseDAO {
  @volatile private var instance:Broadcast[KafkaSinks[String,String]] = null

  def getInstance() = {
    if(instance==null) {
      val conf = new SparkConf().setMaster("local[2]").setAppName("writeKafka")
      val sc = SparkContext.getOrCreate(conf)
      synchronized{
        if (instance==null){
          val kafkaParam = Map[String,String](
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->classOf[StringSerializer].getName
          )
          instance = sc.broadcast(KafkaSinks[String,String](kafkaParam))
        }
        instance
      }
    }
    instance
  }
}
           
  • 对数据进行逻辑操作,并调用广播变量实现
object Test01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("mytest")
    val sc = SparkContext.getOrCreate(conf)

    val ssc = new StreamingContext(sc,Seconds(10))
    val kafkaStream = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "userpro",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
      ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG ->"earliest"
    )

    val stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Set("userpro"),kafkaStream))

    val res = stream.map(_.value()).filter(line => line.split(",").size > 1).flatMap(m => {
      val userStr = m.split(",")
      userStr(1).split(" ").map(k => (userStr(0), k))
    })
//    res.print()

  res.foreachRDD(rdd=>{
    val producers = MySingleBaseDAO.getInstance().value
    rdd.foreach(x=>producers.send("userpro2",x.toString()))
  })


    ssc.start()
    ssc.awaitTermination()
  }
}

           

继续阅读