1.前言
E-MapReduce計劃從EMR-3.18.1版本開始提供Spark Streaming SQL的預覽版功能。Spark Streaming SQL是在Spark Structured Streaming的基礎上做了進一步封裝,友善使用者使用SQL語言進行Spark流式分析開發。Spark Streaming SQL直接地透明地受惠于Spark SQL的優化帶來的性能提升,同時也遵循Spark Structured Streaming的文法限制,例如Spark Structured Streaming不支援多流聚合查詢,Spark Streaming SQL也就同樣不支援。關于EMR Spark Streaming SQL的使用入門,請參考:
- EMR Spark Streaming SQL文法參考
- Spark Streaming SQL Baseline Testing ,驗證和測試工具,可以作為開發入門使用
2.案例介紹
本文将使用Spark Streaming SQL實作對HDFS audilog的實時統計分析。資料流圖如下所示:

一共分為4步:
- 采集HDFS的audit-log到LogService中。
- Spark Streaming SQL從LogService消費資料并進行分析。
- 分析結果寫到到存儲系統中。
- 下遊系統從存儲系統中查詢和使用分析結果,典型的下遊系統有報表或者監控報警。
本文專注于第二步,即如何使用Spark Streaming SQL分析LogService資料。
3.案例實作
本次示範中,我們将結果資料寫到Kafka中。
3.1 環境準備
在E-MapReduce中建立兩個叢集:Hadoop叢集和Kafka叢集。這裡就不詳細示範如何建立EMR叢集了,不熟悉的可以參考
幫助文檔。
3.2 建立資料表
- 建立loghub資料源表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS loghub_hdfs_auditlog;
CREATE TABLE loghub_hdfs_auditlog
USING loghub
OPTIONS (
sls.project = "${SLS_PROJECT_NAME}",
sls.store = "${SLS_STORE_NAME}",
access.key.id = "${ACCESS_KEY_ID}",
access.key.secret = "${ACCESS_KEY_SECRET}",
endpoint = "${SLS_ENDPOINT}",
zookeeper.connect.address = "${ZOOKEEPER_ADDRESS}");
- 建立Kafka輸出表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS kafka_hdfs_auditlog_analysis;
CREATE TABLE kafka_hdfs_auditlog_analysis
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = 'temp_hdfs_auditlog_analysis',
output.mode = 'complete',
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
注意:
- 具體做查詢分析時,需要單獨建立一個Kafka結果表。這是因為每個查詢結果不一樣,輸出表的schema也就不一樣。
- 根據具體的查詢結果schema,我們需要預先在Kafka Schema Registry中注冊好Kafka Topic Schema資訊。
- Schema定義示例:
{"namespace": "org.apache.spark.emr.baseline.testing",
"type": "record",
"name": "TempResult",
"fields": [
{"name": "avg_ss_quantity", "type": ["double", "null"]},
{"name": "avg_ss_ext_sales_price", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
{"name": "avg_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
{"name": "sum_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 17, "scale": 2}, "null"]}
]
}
将schema定義儲存到檔案中,并使用腳本
工具注冊schema到Kafka Schema Registry中。
python ./schema_register.py ${SCHEMA_REGISTRY_URL} ${TOPIC_NAME} ${SCHEMA_FILE}
3.3 統計5分鐘内各個操作次數
- 注冊Kafka Topic Schema:
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"op","type":["string","null"]},
{"name":"count","type":"long"},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 建立一個結果表,命名為:“kafka_hdfs_auditlog_op_count_in_5_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_op_count_in_5_mins;
CREATE TABLE kafka_hdfs_auditlog_op_count_in_5_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_op_count_in_5_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
- 查詢分析
SET streaming.query.name=hdfs_auditlog_op_count_in_5_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_op_count_in_5_mins=/tmp/spark/sql/streaming/hdfs_auditlog_op_count_in_5_mins;
INSERT INTO kafka_hdfs_auditlog_op_count_in_5_mins
SELECT cmd op, count(*) count, window window_time
FROM loghub_hdfs_auditlog
GROUP BY TUMBLING(__time__, interval 5 minute), op
HAVING delay(__time__) < '5 minutes';
注:
- “streaming.query.name”可以為任意名字。必填配置。
- “spark.sql.streaming.checkpointLocation.${streaming.query.name}”對“streaming.query.name”的job配置checkpoint路徑,可以為任意hdfs路徑。必填配置。
- 檢視結果
通過Kafka指令行,檢視kafka_hdfs_auditlog_op_count_in_5_mins topic資料:
視窗: 1550493600000 ~ 1550493900000
{"op":{"string":"create"},"count":47438,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"delete"},"count":181197,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"getfileinfo"},"count":265451,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"listStatus"},"count":641205,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"mkdirs"},"count":12171,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"append"},"count":30981,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"rename"},"count":169709,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"setPermission"},"count":52164,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"open"},"count":436877,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
3.4 統計1分鐘内各個來源IP的open操作次數
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"ip","type":["string","null"]},
{"name":"count","type":"long"},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 建立一個結果表,命名為:“kafka_hdfs_auditlog_read_count_per_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_read_count_per_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_read_count_per_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
SET streaming.query.name=hdfs_auditlog_read_count_per_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_read_count_per_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_read_count_per_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
SELECT ip, count(*) count, window window_time
FROM loghub_hdfs_auditlog
where cmd='open'
GROUP BY TUMBLING(__time__, interval 1 minute), ip
HAVING delay(__time__) < '1 minutes';
視窗: 1550540940000 ~ 1550541000000
{"ip":{"string":"172.*.*.130"},"count":69090,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.1"},"count":69266,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.1"},"count":5129,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.52"},"count":70469,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.24"},"count":7206,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.48"},"count":136101,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.19"},"count":67226,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.82"},"count":141886,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.50"},"count":69165,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.54"},"count":151539,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
視窗: 1550541000000 ~ 1550541060000
{"ip":{"string":"192.*.*.22"},"count":77776,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.111"},"count":9373,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.2"},"count":9329,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
視窗: 1550541060000 ~ 1550541120000
{"ip":{"string":"172.*.*.207"},"count":10481,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"192.*.*.138"},"count":28965,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"10.*.*.160"},"count":22015,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"172.*.*.234"},"count":32892,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
3.5 統計1分鐘内QPS超過100的來源IP
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"ip","type":["string","null"]},
{"name":"qps","type":["double","null"]},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 建立一個結果表,命名為:“kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
SET streaming.query.name=hdfs_auditlog_qps_gt_100_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_qps_gt_100_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_qps_gt_100_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
SELECT ip, qps, window window_time
FROM
(SELECT ip, count(cmd)/60 qps, window
FROM loghub_hdfs_auditlog
GROUP BY TUMBLING(__time__, interval 1 minute), ip
HAVING delay(__time__) < '1 minutes') t
WHERE qps > 100;
視窗:1550543160000 ~ 1550543220000
{"ip":{"string":"10.*.*.140"},"qps":{"double":328.6333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.13"},"qps":{"double":276.43333333333334},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.27"},"qps":{"double":228.23333333333332},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.170"},"qps":{"double":407.4},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.95"},"qps":{"double":233.6},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.8"},"qps":{"double":341.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.167"},"qps":{"double":357.73333333333335},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.142"},"qps":{"double":283.05},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.98"},"qps":{"double":371.46666666666664},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.209"},"qps":{"double":501.35},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.12"},"qps":{"double":276.8333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.98"},"qps":{"double":276.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.169"},"qps":{"double":231.86666666666667},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.166"},"qps":{"double":358.9},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
視窗:1550543220000 ~ 1550543280000
{"ip":{"string":"172.*.*.41"},"qps":{"double":675.3},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.97"},"qps":{"double":364.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.17"},"qps":{"double":392.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.234"},"qps":{"double":361.3833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.72"},"qps":{"double":354.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.3"},"qps":{"double":513.2833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.1"},"qps":{"double":435.18333333333334},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.240"},"qps":{"double":458.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.85"},"qps":{"double":500.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.234"},"qps":{"double":635.1333333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.107"},"qps":{"double":371.76666666666665},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.195"},"qps":{"double":357.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.31"},"qps":{"double":178.95},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
4. 小結
本文簡要示例了如何在EMR上使用Spark Streaming SQL進行流式分析。需要注意的是,EMR Spark Streaming SQL處于預覽版階段,文法和功能還在不斷豐富完善中。