天天看点

Spark kafka实时消费实现

直接上代码,完整的。scala编写

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.typesafe.config.{Config, ConfigFactory}
import xxxxxx.JsonUtil

import scala.collection._
import kafka.serializer.StringDecoder



/**
 * @author iris_new
 */

//这里继承spark Logging是为了方便查看日志
object Example extends Logging{

    def startJob(args: Array[String]){
      //用的typesafe Config直接读取app.conf配置文件,如修改配置无需修改代码
      val appConf = ConfigFactory.load("app.conf")
      //sparkContext
      val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))

      //从检查点恢复Job上下文 或者 新建Job上下文
      val streamConf = appConf.getConfig("streaming")
      def functionToCreateContext(): StreamingContext = {
          val context = new StreamingContext(sc, Seconds(streamConf.getInt("duration")))

          //业务处理
          doXxx(appConf,sc,context)

          //设置checkpoint,我这里的是hdfs的一个路径
          context.checkpoint(streamConf.getString("checkpointDir"))
          context
      }
      val ssc = StreamingContext.getOrCreate(streamConf.getString("checkpointDir"),functionToCreateContext)

      ssc.start()
      ssc.awaitTermination()
    }

    def doXxx(appConf: Config , sc: SparkContext, ssc : StreamingContext) {

      //kafka 配置
      //我贴一下我app.conf中kafka的配置
      /**
      kafka {
          brokers = "22.2.22.22:9092,22.2.22.23:9092,22.2.22.24:9092,22.2.22.25:9092"
          topics = "example"
          offset = "largest"
    }
      */
      val kafkaConf = appConf.getConfig("kafka")
      val topics = kafkaConf.getString("topics")
      val brokers = kafkaConf.getString("brokers")
      val offset = kafkaConf.getString("offset")
      val topicSet = topics.split(",").toSet


      //从Kafka中读取数据
      val kafkaParams = immutable.Map[String, String]("metadata.broker.list" -> brokers,"auto.offset.reset"-> offset)
      //用org.apache.spark.streaming.kafka.KafkaUtils创建DirectStream
      val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet).map(_._2)

      //过滤数据
      val minuteLines = lines.filter(line =>{
        if(line.indexOf("\"code\":\"aaa\"")>=){
          true
        }else{
          false
        }
      }).map(message => {
          //将消息转换为json对象,这里的jsonUtil就不贴出来了。转成[Map[String,Any]]类型
          val jsonData = JsonUtil.read(message)
          jsonData
      })

      minuteLines.foreachRDD(rdd => {
        rdd.foreach {jsonData => {
          val code = jsonData("code").asInstanceOf[String]
          val name = jsonData("name").asInstanceOf[String]
          //do something

       }}
      })

    }

    def main(args: Array[String]) {
      startJob(args)
    }
}
           

上面用到的config工具,maven依赖,在这里贴出来:

<dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.2.1</version>
</dependency>
           

继续阅读