producer類(java)
package cn.kgc.stock;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class KB12StockProducer {
public static void main(String[] args) {
//模拟股票交易記錄
//Long第多少筆交易 Float該筆交易的金額
final Properties CONF = new Properties();
CONF.setProperty("bootstrap.servers", "192.168.6.130:9092");
CONF.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
CONF.setProperty("value.serializer", "org.apache.kafka.common.serialization.FloatSerializer");
CONF.setProperty("retries", "2");
CONF.setProperty("acks", "1");
CONF.setProperty("batch.size", "10");
CONF.setProperty("linger.ms", "500");
final String TOPIC = "stock_01";
final int PARTITION = 0;
KafkaProducer<Long, Float> producer = new KafkaProducer<Long, Float>(CONF);
Random rand = new Random();
long count = 0;
try {
while (true) {
float value = 100 + rand.nextFloat();
Future<RecordMetadata> send = producer.send(new ProducerRecord<Long, Float>(TOPIC, PARTITION, ++count, 100 + rand.nextFloat()));
RecordMetadata rmd = send.get();
System.out.println(rmd.topic()+"\t"+rmd.partition()+"\t"+rmd.offset()+"\t"+count+"->"+value);
TimeUnit.MICROSECONDS.sleep(20+rand.nextInt(980));
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
sparkStreaming(scala)
package cn.kgc.stock
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object StockSparkStream {
def main(args: Array[String]): Unit = {
val conf = mutable.Map[String,String]()
conf.put("bootstrap.servers", "192.168.6.130:9092");
conf.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
conf.put("value.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer");
conf.put("group.id", "stock_kb12")
conf.put("enable.auto.commit", "true")
conf.put("auto.offset.reset", "earliest")
val topics = Array("stock_01")
val spark_conf = new SparkConf().setMaster("local[*]").setAppName("kb12_stock")
//以時間作為資料粒度的控制手段:每3秒
val stream_context = new StreamingContext(spark_conf,Seconds(3))
val stream: InputDStream[ConsumerRecord[Long,Float]] = KafkaUtils.createDirectStream(stream_context,
LocationStrategies.PreferBrokers,
ConsumerStrategies.Subscribe(topics,conf))
stream.foreachRDD(rdd=>{
val value: Array[Float] = rdd.map(_.value()).collect()
val avg = value.sum/value.size
println(s"${value.mkString(",")}\t$avg")
})
stream_context.start()
stream_context.awaitTermination()
}
}