天天看点

spark Streaming 实时流producer类(java)sparkStreaming(scala)

producer类(java)

package cn.kgc.stock;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class KB12StockProducer {
    public static void main(String[] args) {
        //模拟股票交易记录
        //Long第多少笔交易 Float该笔交易的金额

        final Properties CONF = new Properties();
        CONF.setProperty("bootstrap.servers", "192.168.6.130:9092");
        CONF.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        CONF.setProperty("value.serializer", "org.apache.kafka.common.serialization.FloatSerializer");
        CONF.setProperty("retries", "2");
        CONF.setProperty("acks", "1");
        CONF.setProperty("batch.size", "10");
        CONF.setProperty("linger.ms", "500");
        final String TOPIC = "stock_01";
        final int PARTITION = 0;
        KafkaProducer<Long, Float> producer = new KafkaProducer<Long, Float>(CONF);
        Random rand = new Random();
        long count = 0;

        try {
            while (true) {
                float value = 100 + rand.nextFloat();
                Future<RecordMetadata> send = producer.send(new ProducerRecord<Long, Float>(TOPIC, PARTITION, ++count, 100 + rand.nextFloat()));
                RecordMetadata rmd = send.get();
                System.out.println(rmd.topic()+"\t"+rmd.partition()+"\t"+rmd.offset()+"\t"+count+"->"+value);
                TimeUnit.MICROSECONDS.sleep(20+rand.nextInt(980));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

           

sparkStreaming(scala)

package cn.kgc.stock

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object StockSparkStream {
  def main(args: Array[String]): Unit = {
    val conf = mutable.Map[String,String]()
    conf.put("bootstrap.servers", "192.168.6.130:9092");
    conf.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
    conf.put("value.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer");
    conf.put("group.id", "stock_kb12")
    conf.put("enable.auto.commit", "true")
    conf.put("auto.offset.reset", "earliest")
    val topics = Array("stock_01")


    val spark_conf = new SparkConf().setMaster("local[*]").setAppName("kb12_stock")
    //以时间作为数据粒度的控制手段:每3秒
    val stream_context = new StreamingContext(spark_conf,Seconds(3))
    
    val stream: InputDStream[ConsumerRecord[Long,Float]] = KafkaUtils.createDirectStream(stream_context,
      LocationStrategies.PreferBrokers,
      ConsumerStrategies.Subscribe(topics,conf))

    stream.foreachRDD(rdd=>{
      val value: Array[Float] = rdd.map(_.value()).collect()
      val avg = value.sum/value.size
      println(s"${value.mkString(",")}\t$avg")
    })
    stream_context.start()
    stream_context.awaitTermination()
  }
}

           

继续阅读