天天看點

RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

業務背景

對于RDS&POLARDB FOR MYSQL 有些使用者場景會遇到,當一張的資料達到幾千萬時,你查詢一次所花的時間會變多。

這時候會采取水準分表的政策,水準拆分是将同一個表的資料進行分塊儲存到不同的資料庫中,這些資料庫中的表結構完全相同。

本文主要介紹如何把這些水準分表的表歸檔到X-Pack Spark數倉,做統一的大資料計算。X-Pack Spark服務通過外部計算資源的方式,為Redis、Cassandra、MongoDB、HBase、RDS存儲服務提供複雜分析、流式處理及入庫、機器學習的能力,進而更好的解決使用者資料處理相關場景問題。

具體産品見
RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

RDS& POLARDB分表歸檔到X-Pack Spark步驟

一鍵關聯POLARDB到Spark叢集

一鍵關聯主要是做好spark通路RDS& POLARDB的準備工作。

RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

POLARDB表存儲

在database ‘test1’中每5分鐘生成一張表,這裡假設為表 'test1'、'test2'、'test2'、... 。

RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

具體的建表語句如下:

CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8           

歸檔到Spark的調試

x-pack spark提供互動式查詢模式支援直接在控制台送出sql、python腳本、scala code來調試,

幫助文檔

1、首先建立一個互動式查詢的session,在其中添加mysql-connector的jar包,

可參考
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar           
RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

2、建立互動式查詢

以pyspark為例,下面是具體歸檔demo的代碼:

spark.sql("drop table sparktest").show()
# 建立一張spark表,三級分區,分别是天、小時、分鐘,最後一級分鐘用來存儲具體的5分鐘的一張polardb表達的資料。字段和polardb裡面的類型一緻
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb裡面建立了databse test1,具有三張表test1 ,test2,test3,這裡周遊這三張表,每個表存儲spark的一個5min的分區
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #構造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表關聯polardb對應的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表資料要寫入的spark表的分區資訊
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #執行導資料sql 
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除臨時的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #檢視下分區以及統計下資料,主要用來做測試驗證,實際運作過程可以删除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()           
RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

歸檔作業上生産

互動式查詢定位為臨時查詢及調試,生産的作業還是建議使用spark作業的方式運作,使用

文檔參考

。這裡以

pyspark作業為例:

RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

/polardb/polardbArchiving.py 内容如下:

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 建立一張spark表,三級分區,分别是天、小時、分鐘,最後一級分鐘用來存儲具體的5分鐘的一張polardb表達的資料。字段和polardb裡面的類型一緻
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb裡面建立了databse test1,具有三張表test1 ,test2,test3,這裡周遊這三張表,每個表存儲spark的一個5min的分區
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #構造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表關聯polardb對應的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user", "ma,e") \
            .option("password", "xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表資料要寫入的spark表的分區資訊
        (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
        #執行導資料sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除臨時的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #檢視下分區以及統計下資料,主要用來做測試驗證,實際運作過程可以删除
        spark.sql("show partitions sparktest").show(1000, False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()
           

阿裡雲NoSQL資料庫其他動态

阿裡雲Cassandra資料庫正式公測,提供免費試用:

https://www.aliyun.com/product/cds
RDS&POLARDB歸檔到X-Pack Spark計算最佳實踐

繼續閱讀