天天看點

使用EMR DataFrame 流處理 Tablestore

使用Spark的DataFrame方式通路表格存儲,并在本地和叢集上分别進行運作調試。

前提條件

  • 了解Spark通路表格存儲的依賴包,并在使用時通過maven方式引入項目中。
    • Spark相關:spark-core、spark-sql、spark-hive
    • Spark Tablestore connector:emr-tablestore-.jar
    • Tablestore Java SDK:tablestore--jar-with-dependencies.jar
    其中表示相應依賴包的版本号,請以實際為準。
  • 已在表格存儲側建立Source表和在Source表上建立通道,詳情請參見 概述

快速開始

通過項目樣例了解快速使用流計算的操作。

  1. 從GitHub下載下傳項目樣例的源碼,具體下載下傳路徑請參見 TableStoreSparkDemo 。項目中包含完整的依賴和使用樣例,具體的依賴請參見項目中的pom檔案。

2.閱讀TableStoreSparkDemo項目的README文檔,并安裝最新版的Spark Tablestore connector和Tablestore Java SDK到本地maven庫。

說明

Spark Tablestore connector正式版釋出以月為周期,目前最新版尚未正式釋出,請先使用項目附帶的預覽版,正式釋出後,本文也會進行更新,敬請期待。預覽版和正式版隻是版本号的差別,互相相容,業務代碼邏輯無需改動。

3.修改Sample代碼。

以StructuredTableStoreAggSQLSample為例,對此示例代碼的核心代碼說明如下:

  • format("tablestore")表示使用ServiceLoader方式加載Spark Tablestore connector,具體配置請參見項目中的META-INF.services。
  • instanceName、tableName、tunnel.id、endpoint、accessKeyId、accessKeySecret分别表示表格存儲的執行個體名稱、資料表名稱、通道ID、執行個體endpoint、阿裡雲賬号的AccessKey ID和AccessKey Secret。
  • catalog是一個JSON串,包含字段名和類型,如下示例中的資料表有UserId(STRING類型)、OrderId(STRING類型)、price(DOUBLE類型)和timestamp(LONG類型)四個字段。
  • maxoffsetsperchannel表示每一個mini-batch中每一個channel(分區)最多讀取的資料量,預設值為10000。
val ordersDF = sparkSession.readStream
      .format("tablestore")
      .option("instance.name", instanceName)
      .option("table.name", tableName)
      .option("tunnel.id", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxoffsetsperchannel", maxOffsetsPerChannel) //預設值為10000。
      .option("catalog", dataCatalog)
      .load()
      .createTempView("order_source_stream_view")

  val dataCatalog: String =
    s"""
       |{"columns": {
       |    "UserId": {"type":"string"},
       |    "OrderId": {"type":"string"},
       |    "price": {"type":"double"},
       |    "timestamp": {"type":"long"}
       | }
       |}""".stripMargin           

運作調試

根據需求修改示例代碼後,可在本地或者通過Spark叢集進行運作調試。以StructuredTableStoreAggSQLSample為例說明調試過程。

  • 本地調試

以Intellij IDEA為例說明。

本文測試使用的環境為Spark 2.4.3、Scala 2.11.7和Java SE Development Kit 8,如果使用中遇到問題,請聯系表格存儲技術支援。
  • 在系統參數中,配置執行個體名稱、資料表名稱、執行個體endpoint、阿裡雲賬号的AccessKey ID和AccessKey Secret等參數。您也可以自定義參數的加載方式。
  • 選擇include dependencies with "provided" scope,單擊OK。
  • 運作示例代碼程式。
使用EMR DataFrame 流處理 Tablestore
  • 通過Spark叢集調試

以spark-submit方式為例說明。示例代碼中的master預設為local[*],在Spark叢集上運作時可以去掉,使用spark-submit參數傳入。

  • 執行

    mvn -U clean package

    指令打包,包的路徑為target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar。
  • 上傳包到Spark叢集的Driver節點,并使用spark-submit送出任務。
    spark-submit --class com.aliyun.tablestore.spark.demo.streaming.StructuredTableStoreAggSQLSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar <ots-instanceName> <ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint> <max-offsets-per-channel>           
使用EMR DataFrame 流處理 Tablestore