天天看點

通過Spark SQL實時歸檔SLS資料1. 流式計算和SQL2. Spark SQL流式開發入門2.5 流式作業配置3. SLS資料實時歸檔實戰4. 結語

作者:木艮,阿裡雲E-MapReduce開發工程師

我在前一篇文章介紹過基于

Spark SQL實作對HDFS操作的實時監控報警 。今天,我再舉例說明一下如何使用Spark SQL進行流式應用的開發。本文主要分成三部分:

  • 流式計算和SQL
  • 簡要介紹Spark SQL流式開發文法
  • 實時歸檔SLS資料到HDFS

1. 流式計算和SQL

資料的價值随着時間逐漸降低。及時盡早的對資料進行處理提升了資料的價值,是以流式計算系統的應用也越來越廣泛。目前常用的流式計算架構有Storm,Spark Streaming及Flink等,也有Kafka Streams這類基于Kafka的流式處理類庫。各種流式處理架構都有其各自的API,開發者不可避免的需要學習如何使用這些API。如何提供簡單而有效的開發工具,進而把更多的精力投放在業務進行中。是以,各個流式處理系統都逐漸支援SQL API作為開發語言,讓使用者可以像處理Table一樣處理Stream。例如KSQL支援使用SQL進行流式處理Kafka資料。Spark同樣提出來Structured Streaming作為最新一代的流式處理系統,底層的處理引擎也是Spark SQL。不過在上層SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark開源版本上進行了功能擴充,支援使用SQL API在Spark上進行完整的流式查詢開發。

2. Spark SQL流式開發入門

這節将簡要介紹Spark SQL中關于流式開的概念和文法。

2.1 建表

當我們需要對流式資料源進行讀寫操作時,需要首先建立一張表來表示這個資料源。定義表的文法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
           

以上文法中,針對特殊source,不要求一定指定表的列定義。當不指定列定義時,會自動識别資料源的schema資訊。舉一個例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
           

當資料源是Kafka時,會根據Kafka Topic名去到Kafka Schema Registry中查找schema資訊。當然,我們也可以指定列定義,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
           

當指定列定義時,要求必須和Source中的字段定義是一緻的。當執行完CREATE TABLE操作,表的定義會儲存到Hive MetaStore中。

2.2 CTAS

我們可以将建立表和将查詢結果寫入到表的語句合并到一起,那麼就是CREATE TABLE ... AS SELECT ...文法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;
           

舉一個例子(引用自這裡: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id
           

當執行完操作,将建立出表并實際生成一個StreamQuery執行個體,将查詢結果寫入到結果表中。

2.3 DML

流式查詢SQL和離線SQL标準文法大部分是一樣,這邊主要介紹insert操作。流式查詢是不允許單獨進行SELECT操作,必須将SELECT的查詢結果寫入到表中。是以,需要在SELECT操作之前執行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;
           

以上文法為一次流式查詢:這個語句将實際生成一個StreamQuery執行個體,将查詢結果寫入到結果表中。舉一個例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id
           

2.4 window及watermark

限于篇幅,本文暫且不介紹Spark SQL中如何使用window和watermak,有興趣的可以先看看資料,後續會專門撰文介紹。

2.5 流式作業配置

使用SQL進行流式作業開發時,有些必要的配置無法在Query表達出來,需要單獨進行設定。這裡我們使用SET操作進行流式作業必要參數配置,目前有兩個參數需要設定:

通過Spark SQL實時歸檔SLS資料1. 流式計算和SQL2. Spark SQL流式開發入門2.5 流式作業配置3. SLS資料實時歸檔實戰4. 結語

每一個流式查詢執行個體前都需要進行配置,也就是說,當使用CTAS或者Insert操作時,必須前置這兩個配置。一個SQL檔案支援多個流式查詢,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;
           

3. SLS資料實時歸檔實戰

假定一個場景,現在通過SLS收集了業務伺服器上的日志,需要歸檔到HDFS中,便于後續進行離線分析。這裡涉及到兩個資料源:SLS和HDFS。HDFS是Spark官方支援的資料源,支援流和批的讀寫。SLS是阿裡雲的服務,EMR已經支援了流式讀寫。

  • 環境準備

需要E-MapReduce 3.21.0以上版本叢集環境,目前正在釋出準備中,很快和大家見面,敬請期待。

  • 指令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar
           

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将會在EMR SDK 1.7.0版本釋出出來。

  • 分别建立兩張表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
- 通過Spark SQL啟動一個Stream Query将SLS資料實時同步到HDFS中

set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
檢視HDFS資料歸檔情況
image
           
  • 使用Spark SQL對歸檔的資料進行離線分析:例如統計一共有多少個IP
select distinct(ip) from hdfs_service_log;
           
通過Spark SQL實時歸檔SLS資料1. 流式計算和SQL2. Spark SQL流式開發入門2.5 流式作業配置3. SLS資料實時歸檔實戰4. 結語

4. 結語

以上,我們介紹了Spark SQL在流式進行中的一個非常簡單的例子。其實,我們還可以使用Spark SQL進行更加複雜的流式處理任務。後續文章,我将介紹視窗操作,watermark等概念,以及如何在流式資料上進行簡單的機器學習運算。

通過Spark SQL實時歸檔SLS資料1. 流式計算和SQL2. Spark SQL流式開發入門2.5 流式作業配置3. SLS資料實時歸檔實戰4. 結語

繼續閱讀