天天看點

DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

示例一:spark自帶示例項目SparkPi:計算Pi

本文以Spark自帶示例項目計算Pi為例測試目前EMR Spark環境是否可用,示例詳情請參見

EMR示例項目使用說明

準備工作:

擷取spark自帶example的jar包spark-examples_2.11-2.4.5.jar存放路徑,spark元件安裝在/usr/lib/spark-current路徑下,登入EMR叢集可查詢全路徑/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,詳情可參見

EMR常用檔案路徑
DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

執行任務:

建立EMR Spark節點,送出運作代碼。僅需填寫spark-submit後面部分的内容,作業送出會自動補全。

送出代碼:

--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100      

實際執行:

# spark-submit [options] --class [MainClass] xxx.jar args
spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100      

檢視結果:

傳回結果1097: Pi is roughly 3.1415547141554714,運作成功,環境可用。

DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

示例二:Spark對接MaxCompute

以Spark對接MaxCompute為例,實作通過Spark統計MaxCompute表行數。更多應用場景可見

EMR Spark開發指南

本示例涉及雲産品:綁定EMR引擎和MaxCompute引擎的DataWorks項目、OSS。

準備測試資料:

在DataWorks資料開發建立odps sql節點,執行建表和插入資料語句,第一列為bigint類型,插入2條記錄。

DROP TABLE IF EXISTS emr_spark_read_odpstable ;
CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
(
    id BIGINT
    ,name STRING
)
;
INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;      

本地開發:

建立Maven工程,添加pom依賴,詳情請參見

Spark準備工作
<dependency>
        <groupId>com.aliyun.emr</groupId>
        <artifactId>emr-maxcompute_2.11</artifactId>
        <version>1.9.0</version>
    </dependency>      

插件部分僅供參考。

<build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
          
           <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>      

編寫内容:

實作在Spark對MaxCompute表第一列Bigint類型行數統計,詳情請參見

Spark對接MaxCompute

。完成後打jar包,有關odps的依賴都屬于第三方包,是以也需要一起打包上傳到叢集。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.aliyun.emr.example.spark
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkConf, SparkContext}
object SparkMaxComputeDemo {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
          |
          |Arguments:
          |
          |    accessKeyId      Aliyun Access Key ID.
          |    accessKeySecret  Aliyun Key Secret.
          |    envType          0 or 1
          |                     0: Public environment.
          |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
          |    project          Aliyun ODPS project
          |    table            Aliyun ODPS table
          |    numPartitions    the number of RDD partitions
        """.stripMargin)
      System.exit(1)
    }
    val accessKeyId = args(0)
    val accessKeySecret = args(1)
    val envType = args(2).toInt
    val project = args(3)
    val table = args(4)
    val numPartitions = args(5).toInt
    val urls = Seq(
      Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
      Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
    )
    val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
    val sc = new SparkContext(conf)
    val odpsOps = envType match {
      case 0 =>
        OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
      case 1 =>
        OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
    }
    val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    println(s"Count (odpsData): ${odpsData.count()}")
  }
  def read(record: Record, schema: TableSchema): Long = {
    record.getBigint(0)
  }
}      

上傳運作資源:

登入

OSS控制台

,在指定路徑下上傳jar資源(首次使用需要一鍵授權,詳情請參見

emr mr節點

中的一鍵授權)。

本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路徑下上傳emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。

注:由于DataWorks EMR 資源上限是50M,而帶依賴的包通常大于50m,是以直接在OSS控制台上傳。如果您的資源小于50M也可以在DataWorks上操作

建立和使用EMR JAR資源
DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

建立EMR JAR資源:

本示例建立emr_spark_demo-1.0-SNAPSHOT.jar資源,上傳上文打好的jar包,存儲在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路徑下(首次使用需要一鍵授權),送出資源,送出後可前往

OSS管控台

檢視。詳情請參見建立和使用EMR JAR資源。

DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

建立并執行EMR Spark節點:

本示例在業務流程的EMR資料開發子產品下右鍵建立EMR Spark節點命名為emr_spark_odps,選擇EMR引擎執行個體,送出如下代碼,點選進階運作。

其中參數資訊Arguments 需要替換為實際使用的相關資訊。

--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1      
DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例

檢視日志,表記錄數為2符合預期。

DataWorks_資料開發_EMR Spark節點_計算Pi和對接MaxCompute案例