项目基本需求:
利用Flume监控文件,将文件读取到Kafka中,再从Kafka中读入到SparkStreaming,在SparkStreaming中经过简单的处理后再写入到Kafka中。
文件格式
csv文件格式如下,user的朋友关系一对多,将朋友关系展开实现一对一的表格
Flume监控文件读入到Kafka中
a6.sources=s6
a6.channels=c6
a6.sinks=k6
a6.sources.s6.type=spooldir
a6.sources.s6.spoolDir=/opt/data
a6.sources.s6.interceptors=head_filter
a6.sources.s6.interceptors.head_filter.type=regex_filter
a6.sources.s6.interceptors.head_filter.regex=^user.*
a6.sources.s6.interceptors.head_filter.excludeEvents=true
//由于一行的数据过多,Flume默认的行最大长度为2048,设置此项配置增加最大长度
a6.sources.s6.deserializer.maxLineLength=60000
a6.channels.c6.type=memory
a6.channels.c6.capacity=10000
a6.channels.c6.transactionCapacity=1000
a6.sinks.k6.type=org.apache.flume.sink.kafka.KafkaSink
a6.sinks.k6.kafka.bootstrap.servers=192.168.56.101:9092
a6.sinks.k6.kafka.topic=userpro
a6.sinks.k6.channel=c6
a6.sources.s6.channels=c6
SparkStreaming读取Kafka数据并进行简单的处理(数据扁平化:一个用户对应一个朋友),再写入到Kafka中
//创建对象产生一个链接
class KafkaSinks[k,v](fc:()=>KafkaProducer[k,v]) extends Serializable {
lazy val producer = fc()
def send(topic:String,key:k,value:v) = {
producer.send(new ProducerRecord[k,v](topic,key,value))
}
def send(topic:String,value:v)={
producer.send(new ProducerRecord[k,v](topic,value))
}
}
object KafkaSinks{
import scala.collection.JavaConversions._
def apply[k,v](conf:Map[String,String]): KafkaSinks[k,v] = {
var func = ()=>{
val prod = new KafkaProducer[k,v](conf)
sys.addShutdownHook{
prod.close()
}
prod
}
new KafkaSinks[k,v](func)
}
}
//单例上锁
object MySingleBaseDAO {
@volatile private var instance:Broadcast[KafkaSinks[String,String]] = null
def getInstance() = {
if(instance==null) {
val conf = new SparkConf().setMaster("local[2]").setAppName("writeKafka")
val sc = SparkContext.getOrCreate(conf)
synchronized{
if (instance==null){
val kafkaParam = Map[String,String](
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->classOf[StringSerializer].getName
)
instance = sc.broadcast(KafkaSinks[String,String](kafkaParam))
}
instance
}
}
instance
}
}
- 对数据进行逻辑操作,并调用广播变量实现
object Test01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("mytest")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc,Seconds(10))
val kafkaStream = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "userpro",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG ->"earliest"
)
val stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("userpro"),kafkaStream))
val res = stream.map(_.value()).filter(line => line.split(",").size > 1).flatMap(m => {
val userStr = m.split(",")
userStr(1).split(" ").map(k => (userStr(0), k))
})
// res.print()
res.foreachRDD(rdd=>{
val producers = MySingleBaseDAO.getInstance().value
rdd.foreach(x=>producers.send("userpro2",x.toString()))
})
ssc.start()
ssc.awaitTermination()
}
}