天天看點

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

一、購買Hbase1.1并設定對應資源

1.1購買hbase

hbase主要版本為2.0與1.1,這邊選擇對應hbase對應的版本為1.1

Hbase與Hbase2.0版本的差別

HBase1.1版本

1.1版本基于HBase社群1.1.2版本開發。

HBase2.0版本

2.0版本是基于社群2018年釋出的HBase2.0.0版本開發的全新版本。同樣,在此基礎上,做了大量的改進和優化,吸收了衆多阿裡内部成功經驗,比社群HBase版本具有更好的穩定性和性能。

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

1.2确認VPC,vsWitchID

確定測試聯通性的可以友善可行,該hbase的VPCId,vsWitchID盡量與購買的獨享內建資源組的為一緻的,獨享內建資源的文檔可以參考

https://help.aliyun.com/document_detail/137838.html
Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

1.3設定hbase白名單,其中DataWorks白名單如下,個人ECS也可添加

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

根據文檔連結選擇對應的DataWorks的region下的白名單進行添加

https://help.aliyun.com/document_detail/137792.html
Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

1.4檢視hbase對應的版本和通路位址

打開資料庫連結的按鈕,可以檢視到Hbase的主版本以及Hbase的專有網絡通路位址,以及是否開通公網通路的方式進行連接配接。

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

二、安裝Phonix用戶端,并建立表和插入資料

2.1安裝用戶端

根據hbase的版本為1.1選擇Phonix的版本為4.12.0根據文檔

https://help.aliyun.com/document_detail/53600.html

下載下傳對應的用戶端檔案ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz

登陸用戶端執行指令

./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181           
Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

建立表:

CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;           

插入資料:

UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');           

2.2檢視是否建立和插入成功

在用戶端執行指令,檢視目前表與資料是否上傳成功

select * from users;           
Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

三、編寫對應代碼邏輯

3.1編寫代碼邏輯

在IDEA按照對應得Pom檔案進行配置本地得開發環境,将代碼涉及到得配置資訊填寫完整,進行編寫測試,這裡可以先使用Hbase得公網通路連結進行測試,代碼邏輯驗證成功後可調整配置參數,具體代碼如下

package com.git.phonix
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark._
/**
  * 本執行個體适用于Phoenix 4.x版本
  */
object SparkOnPhoenix4xSparkSession {
  def main(args: Array[String]): Unit = {
    //HBase叢集的ZK連結位址。
    //格式為:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181
    val zkAddress = args(0)
    //Phoenix側的表名,需要在Phoenix側提前建立。Phoenix表建立可以參考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW
    val phoenixTableName = args(1)
    //Spark側的表名。
    val ODPSTableName = args(2)
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQL-on-MaxCompute")
      .config("spark.sql.broadcastTimeout", 20 * 60)
      .config("spark.sql.crossJoin.enabled", true)
      .config("odps.exec.dynamic.partition.mode", "nonstrict")
      //.config("spark.master", "local[4]") // 需設定spark.master為local[N]才能直接運作,N為并發數
      .config("spark.hadoop.odps.project.name", "***")
      .config("spark.hadoop.odps.access.id", "***")
      .config("spark.hadoop.odps.access.key", "***")
      //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
      .config("spark.sql.catalogImplementation", "odps")
      .getOrCreate()
    //第一種插入方式
    var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
    df.show()
    df.write.mode("overwrite").insertInto(ODPSTableName)
  }
}           

3.2對應Pom檔案

pom檔案中分為Spark依賴,與ali-phoenix-spark相關的依賴,由于涉及到ODPS的jar包,會在叢集中引起jar沖突,是以要将ODPS的包排除掉

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <properties>
        <spark.version>2.3.0</spark.version>
        <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <phoenix.version>4.12.0-HBase-1.1</phoenix.version>
    </properties>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>Spark-Phonix</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.jpmml</groupId>
            <artifactId>pmml-model</artifactId>
            <version>1.3.8</version>
        </dependency>
        <dependency>
            <groupId>org.jpmml</groupId>
            <artifactId>pmml-evaluator</artifactId>
            <version>1.3.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scalap</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>cupid-sdk</artifactId>
            <version>${cupid.sdk.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-core</artifactId>
            <version>4.12.0-AliHBase-1.1-0.8</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun.odps</groupId>
                    <artifactId>odps-sdk-mapred</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.aliyun.odps</groupId>
                    <artifactId>odps-sdk-commons</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-spark</artifactId>
            <version>4.12.0-AliHBase-1.1-0.8</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun.phoenix</groupId>
                    <artifactId>ali-phoenix-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>false</minimizeJar>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <artifactSet>
                                <includes>
                                    <!-- Include here the dependencies you
                                        want to be packed in your fat jar -->
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>**/log4j.properties</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile-first</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>           

四、打包上傳到DataWorks進行冒煙測試

4.1建立要傳入的MaxCompute表

CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;           

4.2打包上傳到MaxCompute

在IDEA打包要打成shaded包,将所有的依賴包,打入jar包中,由于DatadWork界面方式上傳jar包有50M的限制,是以采用MaxCompute用戶端進行jar包

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

4.3選擇對應的project環境,檢視上傳資源,并點選添加到資料開發

進入DataWorks界面選擇左側資源圖示,選擇對應的環境位開發換進,輸入删除檔案時的檔案名稱進行搜尋,清單中展示該資源已經上傳成,點選送出到資料開發

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

點選送出按鈕

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

4.4配置對應的vpcList參數并送出任務測試

其中的配置vpcList檔案的配置資訊如下,可具體根據個人hbase的連結,進行配置

{
    "regionId":"cn-beijing",
    "vpcs":[
        {
            "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",
            "zones":[
                {
                    "urls":[
                        {
                            "domain":"172.16.0.12",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16020
                        }
                    ]
                }
            ]
        }
    ]
}           

Spark任務送出任務的配置參數,主類,以及對應的參數

該參數主要為3個參數第一個為Phonix的連結,第二個為Phonix的表名稱,第三個為傳入的MaxCompute表

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

點選冒煙測試按鈕,可以看到任務執行成功

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

在臨時查詢節點中執行查詢語句,可以得到資料已經寫入MaxCompute的表中

Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結:

總結:

使用Spark on MaxCompute通路Phonix的資料,并将資料寫入到MaxCompute的表中經過實踐,該方案時可行的。但在實踐的時有幾點注意事項:

1.結合實際使用情況選擇對應的Hbase以及Phonix版本,對應的版本一緻,并且所使用的用戶端,以及代碼依賴都會有所改變。

2.使用公網在IEAD進行本地測試,要注意Hbase白名單,不僅要設定DataWorks的白名單,還需将自己本地的位址加入到白名單中。

3.代碼打包時需要将pom中的依賴關系進行梳理,避免ODPS所存在的包在對應的依賴中,進而引起jar包沖突,并且打包時打成shaded包,避免缺失遺漏對應的依賴。

歡迎加入“MaxCompute開發者社群2群”,點選連結申請加入或掃描二維碼

https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
Spark On MaxCompute如何通路Phonix資料一、購買Hbase1.1并設定對應資源二、安裝Phonix用戶端,并建立表和插入資料三、編寫對應代碼邏輯四、打包上傳到DataWorks進行冒煙測試總結: