天天看點

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

背景

Spark on MaxCompute可以通路位于阿裡雲VPC内的執行個體(例如ECS、HBase、RDS),預設MaxCompute底層網絡和外網是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.vpc.domain.list來通路阿裡雲的vpc網絡環境的Hbase。Hbase标準版和增強版的配置不同,本文通過通路阿裡雲的标準版和增強版的Hbase簡單的描述需要加的配置。

Hbase标準版

環境準備

Hbase的網絡環境是存在vpc下的,是以我們首先要添加安全組開放端口2181、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。

設定對應vpc的安全組

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

找到對應的vpc id然後添加安全組設定端口

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

添加Hbase的白名單

在hbase的白名單添加

100.104.0.0/16           

建立Hbase表

create 'test','cf'           

編寫Spark程式

需要的Hbase依賴

<dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.0.2</version>
    </dependency>
     <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.5</version>
    </dependency>           

編寫代碼

object App {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HbaseTest")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

    val sc = spark.sparkContext
    val config = HBaseConfiguration.create()
    val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


    try{

      import spark._
      spark.sql("select '7', 88 ").rdd.map(row => {
        val name= row(0).asInstanceOf[String]
        val id = row(1).asInstanceOf[Integer]
        val put = new Put(Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    } finally {
      sc.stop()
    }
  }
}           

送出到DataWorks

由于大于50m通過odps用戶端送出

add jar SparkHbase-1.0-SNAPSHOT -f;            

進入資料開發建立spark節點

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

添加配置

需要配置spark.hadoop.odps.cupid.vpc.domain.list

這裡的hbase域名需要hbase所有的機器,少一台可能會造成網絡不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            }
          ]
        }
      ]
    }
  ]
}           
如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

Hbase增強版

Hbase增強版的端口是30020、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版
100.104.0.0/16           

建立Hbase表 

create 'test','cf'           

需要的Hbase依賴,引用的包必須是阿裡雲增強版的依賴

<dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.8</version>
    </dependency>           
object McToHbase {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("spark_sql_ddl")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

      val sc = spark.sparkContext


    try{
      spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
        val config = HBaseConfiguration.create()
        // 叢集的連接配接位址(VPC内網位址)在控制台頁面的資料庫連接配接界面獲得
        config.set("hbase.zookeeper.quorum", ":30020");
        import spark._
        // xml_template.comment.hbaseue.username_password.default
        config.set("hbase.client.username", "");
        config.set("hbase.client.password", "");
        val tableName = TableName.valueOf( "test")
        val conn = ConnectionFactory.createConnection(config)
        val table = conn.getTable(tableName);
        val puts = new util.ArrayList[Put]()
        iter.foreach(
          row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
            puts.add(put)
            table.put(puts)
          }
        )
      }
  } finally {
    sc.stop()
  }



  }
}           

注意

hbase clinet會報org.apache.spark.SparkException: Task not serializable

原因是spark會把序列化對象以将其發送給其他的worker

解決方案

- 使類可序列化
- 僅在map中傳遞的lambda函數中聲明執行個體。
- 将NotSerializable對象設定為靜态對象,并在每台計算機上建立一次。
- 調用rdd.forEachPartition并在其中建立

Serializable對象,如下所示:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...現在處理iter});
           
add jar SparkHbase-1.0-SNAPSHOT -f;            
如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

注意:

1.這個裡需要添加增強版java api通路位址,這裡必須采用ip的形式。ip通過直接ping該位址擷取,這裡的ip是172.16.0.10添加端口16000

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

2.這裡的hbase域名需要hbase所有的機器,少一台可能會造成網絡不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
             {"domain":"172.16.0.10","port":16000}
          ]
        }
      ]
    }
  ]
}           
如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版

大家如果對MaxCompute有更多咨詢或者建議,歡迎掃碼加入 MaxCompute開發者社群釘釘群,或點選

連結

申請加入。

如何使用MaxCompute Spark讀寫阿裡雲Hbase背景Hbase标準版Hbase增強版