天天看點

使用EMR SQL 批處理Tablestore

通過在E-MapReduce叢集中使用Spark SQL通路表格存儲。對于批計算,Tablestore on Spark提供索引選擇、分區裁剪、Projection列和Filter下推、動态指定分區大小等功能,利用表格存儲的全局二級索引或者多元索引可以加速查詢。

前提條件

  • 已建立E-MapReduce Hadoop叢集。具體操作,請參見 建立叢集

    建立叢集時,請確定打開挂載公網開關,将叢集挂載到公網,用于Shell遠端登入伺服器。

    說明

    本文使用Shell指令示範,如果需要使用E-MapReduce的圖形化頁面進行資料開發。具體操作,請參見

    資料開發
    使用EMR SQL 批處理Tablestore
  • 已上傳 emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar 包到EMR Header伺服器。

Spark連接配接表格存儲資料表和全局二級索引

Spark連接配接到表格存儲資料表和全局二級索引後,通過Spark外表查詢資料時,系統會根據查詢條件中設定的列條件自動選擇索引表進行查詢。

1. 在表格存儲側建立資料表或全局二級索引

  1. 建立表格存儲的資料表。具體操作,請參見 概述 本示例中資料表名稱為tpch_lineitem_perf,主鍵列為l_orderkey(LONG類型)、l_linenumber(LONG類型),屬性列分别為l_comment(STRING類型)、l_commitdate(STRING類型)、l_discount(DOUBLE類型)、l_extendedprice(DOUBLE類型)、l_linestatus(STRING類型)、l_partkey(LONG類型)、l_quantity(DOUBLE類型)、l_receiptdate(STRING類型)等14列,資料條數為384016850,資料樣例如下圖所示。
    使用EMR SQL 批處理Tablestore
  2. (可選)在資料表上建立全局二級索引。具體操作,請參見 使用SDK
    當查詢條件中需要使用資料表的非主鍵列,建議建立全局二級索引加速查詢。
    全局二級索引支援在指定列上建立索引,生成的索引表中資料按指定的索引列進行排序,資料表的每一個資料寫入都将自動以異步方式同步到索引表。
    使用EMR SQL 批處理Tablestore

2. 在EMR叢集側建立Spark外表

  1. 登入EMR Header伺服器。
  2. 執行如下指令啟動spark-sql指令行,用于Spark外表建立和後續的SQL實戰操作。

    其中Spark的标準啟動參數為

    --num-executors 32 --executor-memory 2g --executor-cores 2

    ,可以根據具體的叢集配置進行自定義調整。表示上傳jar包的版本資訊,請根據實際填寫,例如2.1.0-SNAPSHOT。
    spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2           
  3. 建立Spark外表同時連接配接全局二級索引。
    • 參數
    endpoint 表格存儲執行個體通路位址,EMR叢集中使用VPC位址。
    access.key.id 阿裡雲賬号的AccessKey ID。
    access.key.secret 阿裡雲賬号的AccessKey Secret。
    instance.name
    table.name 表格存儲的資料表名稱。
    split.size.mbs 每個Split的大小,預設值為100 MB。
    max.split.count 資料表計算出的最大Split數,并發數和Spark的Split個數對應,預設值為1000。
    catalog 資料表的Schema定義。
    • 執行個體
      DROP TABLE IF EXISTS tpch_lineitem;
      CREATE TABLE tpch_lineitem
      USING tablestore
      OPTIONS(
      endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="vehicle-test",
      table.name="tpch_lineitem_perf",
      split.size.mbs=10,
      max.split.count=1000,
      catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
      );           

3. SQL查詢實戰

如下是不同查詢需求的SQL查詢樣例,請根據實際業務組合使用SQL查詢。

  • 全表查詢
    • SQL語句:SELECT COUNT(*) FROM tpch_lineitem;
    • SQL總耗時:36.199s、34.711s、34.801s,平均耗時35.237s。
  • 主鍵查詢
    • SQL語句:SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;
    • 表格存儲服務端:GetRow操作,平均耗時為0.585 ms。
  • 非主鍵查詢,未開啟全局二級索引
    • SQL語句:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';
    • SQL總耗時:37.006s、37.269s、37.17s,平均耗時37.149s。
  • 非主鍵查詢,開啟全局二級索引
    • SQL總耗時(開啟l_shipdate列的全局二級索引):1.686s、1.651s、1.784s,平均耗時1.707s。

Spark連接配接表格存儲資料表和多元索引

Spark連接配接到表格存儲資料表和多元索引後,通過Spark外表查詢資料時,系統會自動使用設定的多元索引進行查詢。

1. 在表格存儲側建立資料表和多元索引

  1. 建立資料表。具體操作,請參見 本示例中資料表名稱為geo_table,主鍵列為pk1(String類型),屬性列分别為val_keyword1(String類型)、val_keyword2(String類型)、val_keyword3(String類型)、val_bool(Boolean類型)、val_double(Double類型)、val_long1(Long類型)、val_long2(Long類型)、val_text(String類型)和val_geo(String類型),資料條數為208912382,資料樣例如下圖所示。
    使用EMR SQL 批處理Tablestore
  2. 在資料表上建立多元索引。具體操作,請參見 建立及使用多元索引 建立多元索引時,根據字段類型選擇對應的多元索引Mapping。
    說明:建立多元索引時,地理位置字段需選擇字段類型為地理位置而非字元串類型。
    使用EMR SQL 批處理Tablestore
    建立多元索引後,多元索引會自動開始同步資料表中的資料,待多元索引進入增量狀态時,表示多元索引完成建構。
    使用EMR SQL 批處理Tablestore

在EMR叢集側建立Spark外表

  1. 登入EMR Header伺服器
  2. 建立Spark外表同時連接配接多元索引
    push.down.range.long 與Long類型的列做大小(>=、>、<、<=)比較的謂詞是否下推。更多資訊,請參見 批計算謂詞下推配置 。類型為Boolean,預設值為true,表示與Long類型的列做大小比較的謂詞下推。設定為false時,表示與Long類型的列做大小比較的謂詞不下推。
    push.down.range.string 與String類型的列做大小(>=、>、<、<=)比較的謂詞是否下推。更多資訊,請參見 。類型為Boolean,預設值為true,表示與String類型的列做大小比較的謂詞下推。設定為false時,表示與String類型的列做大小比較的謂詞不下推。
    • 示例
      DROP TABLE IF EXISTS geo_table;
      CREATE TABLE geo_table (
      pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING,
      val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
      val_text STRING, val_geo STRING COMMENT "geo stored in string format"
      )
      USING tablestore
      OPTIONS(
      endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="sparksearchtest",
      table.name="geo_table",
      search.index.name="geo_table_index",
      max.split.count=64,
      push.down.range.long = false,
      push.down.range.string = false
      );           

  • 使用多元索引全表查詢
    • SQL語句:SELECT COUNT(*) FROM geo_table;
    • SQL耗時:測試資料208912382條,配置64個Parallel Scan并發,實際耗時165.208s,平均QPS約126.45萬。
      208912382
      Time taken: 165.208 seconds, Fetched 1 row(s)
      20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)           
  • 組合條件查詢
    • SQL語句:

      SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;

    • SQL耗時:Spark會将Projection列和Filter下推到多元索引,實際耗時2.728s,極大加快查詢效率。
      21423964        4017    aaa     2501.9901650365096
      21962236        2322    eio     2775.9021545044116
      Time taken: 2.894 seconds, Fetched 100 row(s)
      20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second           
  • 地理位置查詢

地理位置查詢包括地理距離查詢、地理長方形查詢和地理多邊形範圍查詢三種地理位置查詢方式。示例中val_geo為地理位置字段名,地理坐标的格式都為"緯度,經度"。

  • 地理距離查詢

    文法為val_geo = '{"centerPoint":"中心點坐标", "distanceInMeter": 距離中心點的距離}'。

    SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';

    • 地理長方形查詢

      文法為val_geo = '{"topLeft":"矩形框的左上角的坐标", "bottomRight": "矩形框的右下角的坐标"}'。

      SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}';

  • 地理多邊形範圍查詢

    文法為val_geo = '{"points":["坐标1", "坐标2", .... "坐标n-1", "坐标n"]}'。

    SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';