背景
Spark on MaxCompute可以通路位于阿裡雲VPC内的執行個體(例如ECS、HBase、RDS),預設MaxCompute底層網絡和外網是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.vpc.domain.list來通路阿裡雲的vpc網絡環境的Hbase。Hbase标準版和增強版的配置不同,本文通過通路阿裡雲的标準版和增強版的Hbase簡單的描述需要加的配置。
Hbase标準版
環境準備
Hbase的網絡環境是存在vpc下的,是以我們首先要添加安全組開放端口2181、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
設定對應vpc的安全組

找到對應的vpc id然後添加安全組設定端口
添加Hbase的白名單
在hbase的白名單添加
100.104.0.0/16
建立Hbase表
create 'test','cf'
編寫Spark程式
需要的Hbase依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.5</version>
</dependency>
編寫代碼
object App {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HbaseTest")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
val config = HBaseConfiguration.create()
val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
try{
import spark._
spark.sql("select '7', 88 ").rdd.map(row => {
val name= row(0).asInstanceOf[String]
val id = row(1).asInstanceOf[Integer]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
} finally {
sc.stop()
}
}
}
送出到DataWorks
由于大于50m通過odps用戶端送出
add jar SparkHbase-1.0-SNAPSHOT -f;
進入資料開發建立spark節點
添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
這裡的hbase域名需要hbase所有的機器,少一台可能會造成網絡不通
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
}
]
}
]
}
]
}
Hbase增強版
Hbase增強版的端口是30020、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
100.104.0.0/16
建立Hbase表
create 'test','cf'
需要的Hbase依賴,引用的包必須是阿裡雲增強版的依賴
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.8</version>
</dependency>
object McToHbase {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("spark_sql_ddl")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
try{
spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
val config = HBaseConfiguration.create()
// 叢集的連接配接位址(VPC内網位址)在控制台頁面的資料庫連接配接界面獲得
config.set("hbase.zookeeper.quorum", ":30020");
import spark._
// xml_template.comment.hbaseue.username_password.default
config.set("hbase.client.username", "");
config.set("hbase.client.password", "");
val tableName = TableName.valueOf( "test")
val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(tableName);
val puts = new util.ArrayList[Put]()
iter.foreach(
row => {
val id = row(0).asInstanceOf[String]
val name = row(1).asInstanceOf[String]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
puts.add(put)
table.put(puts)
}
)
}
} finally {
sc.stop()
}
}
}
注意
hbase clinet會報org.apache.spark.SparkException: Task not serializable
原因是spark會把序列化對象以将其發送給其他的worker
解決方案
- 使類可序列化
- 僅在map中傳遞的lambda函數中聲明執行個體。
- 将NotSerializable對象設定為靜态對象,并在每台計算機上建立一次。
- 調用rdd.forEachPartition并在其中建立
Serializable對象,如下所示:
rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...現在處理iter});
add jar SparkHbase-1.0-SNAPSHOT -f;
注意:
1.這個裡需要添加增強版java api通路位址,這裡必須采用ip的形式。ip通過直接ping該位址擷取,這裡的ip是172.16.0.10添加端口16000
2.這裡的hbase域名需要hbase所有的機器,少一台可能會造成網絡不通
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{"domain":"172.16.0.10","port":16000}
]
}
]
}
]
}
大家如果對MaxCompute有更多咨詢或者建議,歡迎掃碼加入 MaxCompute開發者社群釘釘群,或點選
連結申請加入。