在這篇文章裡,我們模拟了一個場景,實時分析訂單資料,統計實時收益。
場景模拟
我試圖覆寫工程上最為常用的一個場景:
1)首先,向Kafka裡實時的寫入訂單資料,JSON格式,包含訂單ID-訂單類型-訂單收益
2)然後,spark-streaming每十秒實時去消費kafka中的訂單資料,并以訂單類型分組統計收益
3)最後,spark-streaming統計結果實時的存入本地MySQL。
前提條件
安裝
1)spark:我使用的yarn-client模式下的spark,環境中叢集用戶端已經搞定
2)zookeeper:我使用的是這個叢集:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
3)kafka:我使用的是standalone模式:10.93.21.21:9093
4)mysql:10.93.84.53:3306
語言
python:pykafka,pip install pykafka
java:spark,spark-streaming
下面開始
1、資料寫入kafka
- kafka寫入
我們使用pykafka模拟資料實時寫入,代碼如下:
kafka_producer.py
# -* coding:utf8 *-
import time
import json
import uuid
import random
import threading
from pykafka import KafkaClient
# 建立kafka執行個體
hosts = '10.93.21.21:9093'
client = KafkaClient(hosts=hosts)
# 列印一下有哪些topic
print client.topics
# 建立kafka producer句柄
topic = client.topics['kafka_spark']
producer = topic.get_producer()
# work
def work():
while 1:
msg = json.dumps({
"id": str(uuid.uuid4()).replace('-', ''),
"type": random.randint(1, 5),
"profit": random.randint(13, 100)})
producer.produce(msg)
# 多線程執行
thread_list = [threading.Thread(target=work) for i in range(10)]
for thread in thread_list:
thread.setDaemon(True)
thread.start()
time.sleep(60)
# 關閉句柄, 退出
producer.stop()
複制
可以看到,我們寫入的形式是一個json,訂單id是一個uuid,訂單類型type從1-5随機,訂單收益profit從13-100随機,形如
{"id": ${uid}, "type": 1, "profit": 30}
複制
注意:1)python對kafka的讀寫不需要借助zookeeper,2)使用多線程的形式寫入,讓資料量具有一定的規模。
執行producer,會持續寫入資料1分鐘。
python kafka_producer.py
複制
- 驗證一下
kafka_consumer.py
# -* coding:utf8 *-
from pykafka import KafkaClient
hosts = '10.93.21.21:9093'
client = KafkaClient(hosts=hosts)
# 消費者
topic = client.topics['kafka_spark']
consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,
consumer_id='test')
for message in consumer:
if message is not None:
print message.offset, message.value
複制
執行,可以消費kafka剛才寫入的資料
python kafka_consumer.py
複制
2、spark-streaming
1)先解決依賴
其中比較核心的是spark-streaming和kafka內建包spark-streaming-kafka_2.10,還有spark引擎spark-core_2.10
json和mysql看大家愛好。
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.19</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
</dependencies>
複制
2)MySQL準備
建表
我們的結果去向是MySQL,先建立一個結果表。
id:主鍵,自增id
type:訂單類型
profit:每個spark batch聚合出的訂單收益結果
time:時間戳
CREATE TABLE `order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`type` int(11) DEFAULT NULL,
`profit` int(11) DEFAULT NULL,
`time` mediumtext,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
複制
- Java用戶端
采用了單例線程池的模式簡單寫了一下。
ConnectionPool.java
package com.xiaoju.dqa.realtime_streaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
public class ConnectionPool {
private static LinkedList<Connection> connectionQueue;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public synchronized static Connection getConnection() {
try {
if (connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for (int i = 0; i < 5; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://10.93.84.53:3306/big_data",
"root",
"1234");
connectionQueue.push(conn);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}
public static void returnConnection(Connection conn){connectionQueue.push(conn);}
}
複制
3)代碼實作
我用java寫的,不會用scala很尴尬。
即時用java整個的處理過程依然比較簡單。跟常見的wordcount也沒有多大的差别。
SparkStreaming特點
spark的特點就是RDD,通過對RDD的操作,來屏蔽分布式運算的複雜度。
而spark-streaming的操作對象是RDD的時間序列DStream,這個序列的生成是跟batch的選取有關。例如我這裡Batch是10s一個,那麼每隔10s會産出一個RDD,對RDD的切割和序列的生成,spark-streaming對我們透明了。唯一暴露給我們的DStream和原生RDD的使用方式基本一緻。
這裡需要講解一下MySQL寫入注意的事項。
MySQL寫入
在處理mysql寫入時使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。
這樣做的原因是:
1)你無法再Driver端建立mysql句柄,并通過序列化的形式發送到worker端
2)如果你在處理rdd中建立mysql句柄,很容易對每一條資料建立一個句柄,在處理過程中很快記憶體就會溢出。
OrderProfitAgg.java
package com.xiaoju.dqa.realtime_streaming;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
import java.util.*;
/*
* 生産者可以選用kafka自帶的producer腳本
* bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test
* */
public class OrderProfitAgg {
public static void main(String[] args) throws InterruptedException {
/*
* kafka所注冊的zk叢集
* */
String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";
/*
* spark-streaming消費kafka的topic名稱, 多個以逗号分隔
* */
String topics = "kafka_spark,kafka_spark2";
/*
* 消費組 group
* */
String group = "bigdata_qa";
/*
* topic的分區數
* */
int numThreads = 2;
/*
* 選用yarn隊列模式, spark-streaming程式的app名稱是"order profit"
* */
SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");
/*
* 建立sc, 全局唯一, 設定logLevel可以列印一些東西到控制台
* */
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("WARN");
/*
* 建立jssc, spark-streaming的batch是每10s劃分一個
* */
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
/*
* 準備topicMap
* */
Map<String ,Integer> topicMap = new HashMap<String, Integer>();
for (String topic : topics.split(",")) {
topicMap.put(topic, numThreads);
}
/*
* kafka資料流
* */
List<JavaPairReceiverInputDStream<String, String>> streams = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
for (int i = 0; i < numThreads; i++) {
streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));
}
/*
* 從kafka消費資料的RDD
* */
JavaPairDStream<String, String> streamsRDD = streams.get(0);
for (int i = 1; i < streams.size(); i++) {
streamsRDD = streamsRDD.union(streams.get(i));
}
/*
* kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35}
* 統計結果, 以type分組的總收益
* mapToPair, 将kafka消費的資料, 轉化為type-profit key-value對
* reduceByKey, 以type分組, 聚合profit
* */
JavaPairDStream<Integer, Integer> profits = streamsRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<String, String> s_tuple2) throws Exception {
JSONObject jsonObject = JSON.parseObject(s_tuple2._2);
return new Tuple2<Integer, Integer>(jsonObject.getInteger("type"), jsonObject.getInteger("profit"));
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
/*
* 輸出結果到MySQL
* 需要為每一個partition建立一個MySQL句柄, 使用foreachPartition
* */
profits.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() {
@Override
public Void call(JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD) throws Exception {
integerIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, Integer>>>() {
@Override
public void call(Iterator<Tuple2<Integer, Integer>> tuple2Iterator) throws Exception {
Connection connection = ConnectionPool.getConnection();
Statement stmt = connection.createStatement();
long timestamp = System.currentTimeMillis();
while(tuple2Iterator.hasNext()) {
Tuple2<Integer, Integer> tuple = tuple2Iterator.next();
Integer type = tuple._1;
Integer profit = tuple._2;
String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);
stmt.executeUpdate(sql);
}
ConnectionPool.returnConnection(connection);
}
});
return null;
}
});
/*
* 開始計算, 等待計算結束
* */
jssc.start();
try {
jssc.awaitTermination();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
jssc.close();
}
}
}
複制
4)打包方法
編寫pom.xml build tag。
mvn clean package打包即可。
pom.xml
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--這裡要替換成jar包main方法所在類 -->
<!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>-->
<mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包節點執行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
複制
3、執行與結果
1)執行kafka_producer.py
python kafka_producer.py
複制
2) 執行spark-streaming
這裡使用的是預設參數送出yarn隊列。
spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
複制
3)檢視結果
到MySQL中檢視結果,每隔10秒會聚合出type=1-5的5條資料。
例如第一條資料,就是type=4這種類型的業務,在10s内收益是555473元。業務量驚人啊。哈哈。
