天天看点

Spark——Spark读写HBaseSpark读HBaseSpark写HBaseSpark应用程序依赖的jar包

文章目录

  • Spark读HBase
    • 1. 使用newAPIHadoopRDD API
  • Spark写HBase
    • 1. saveAsNewAPIHadoopFile API
    • 2. BulkLoad
  • Spark应用程序依赖的jar包

Spark读HBase

1. 使用newAPIHadoopRDD API

代码实现:

import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


def readHBase(spark: SparkSession): Unit = {
    val config = HBaseConfiguration.create()
    config.set(TableInputFormat.INPUT_TABLE, "test") //表名
    config.set(TableInputFormat.SCAN_ROW_START, "start_key") //扫描起始rowKey
    config.set(TableInputFormat.SCAN_ROW_STOP, "stop_key") //扫描终止rowKey

    //HBase表加载为RDD[(K, V)]
    val rdd = spark.sparkContext.newAPIHadoopRDD(config,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result]
    )

    //从Result中获取指定列最新版本的值
    val rdd1 = rdd.map(m => {
        //获取一行查询结果
        val result: Result = m._2

        val rowKey = Bytes.toString(result.getRow) //获取row key
        val userId = Bytes.toString(result.getValue("cf".getBytes,"user_id".getBytes))
        val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
        val age = Bytes.toString(result.getValue("cf".getBytes,"age".getBytes))

        Row(rowKey, userId, name, age)
    })

    //创建schema
    val schema = StructType(
      StructField("user_id", IntegerType, false) ::
      StructField("name", StringType, false) ::
      StructField("age", IntegerType, true) :: Nil)

    //RDD转为DataFrame
    val df = spark.createDataFrame(rdd1, schema)
    
    df.select("name", "age")
}
           

Spark写HBase

1. saveAsNewAPIHadoopFile API

代码实现:

import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableInputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.mutable.ArrayBuffer

def writeHBase(spark: SparkSession, datasetJson: Dataset[String]): Unit = {
    \//假设datasetJson是一个json字符串类型的Dataset, 其结构为"{"name":"", "age":"", "phone":"", "address":"", }"
    val ds = spark.read.json(datasetJson)
    
    val rdd = ds.rdd.mapPartitions(iterator => {
        iterator.map(m => {
            val json = JSON.parseObject(m.toString()) //json字符串解析成JSONObject
            val phone = json.getString("phone") //phone作为KeyValue的row key
            json.remove("phone") //以便遍历其他所有键值对

            val writable = new ImmutableBytesWritable(Bytes.toBytes(phone)) //键值对字节序列
            val array = ArrayBuffer[(ImmutableBytesWritable, KeyValue)]() //初始化数组, 存储JSONObject中的键值对
            
			//JSON中的key作为Hbase表中的列名,并按字典序排序
            val jsonKeys = json.keySet().toArray.map(_.toString).sortBy(x => x) 

            val length = jsonKeys.length
            for (i <- 0 until length) {
                val key = jsonKeys(i)
                val value = json.get(jsonKeys(i)).toString
                //KeyValue为HBase中的基本类型Key/Value。
                //构造函数中的参数依次为:rowkey、列族、列名、值。
                //Json对象中的每个key和其值构成HBase中每条记录的value
                val keyValue: KeyValue = new KeyValue(
                    Bytes.toBytes(phone),  //row key
                    "cf".getBytes(), //列族名
                    key.getBytes(), //列名
                    value.getBytes()) //列的值

                array += ((writable, keyValue))
            }
            array
        })
        //重新分区,减少保存的文件数
        //展开数组中的元素
        //对rowkey排序
    }).repartition(1).flatMap(x => x).sortByKey()

    val config = HBaseConfiguration.create()
    config.set(TableInputFormat.INPUT_TABLE, "test") //表名

    val job = Job.getInstance(config)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

    //持久化到HBase表
    rdd.saveAsNewAPIHadoopFile("/tmp/test",
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        job.getConfiguration)

}
           

2. BulkLoad

在将大批量数据离线导入HBase系统的场景中,不管是使用Put单条插入还是Put批量插入HBase,效率都不理想,还可能占用大量带宽资源,导致其他业务程序运行变慢。这个时候就可以考虑使用Bulk Load,Bulk Load使用MapReduce将待写入数据转换成HFile文件,再直接将这些HFile文件加载到集群中,BulkLoad并没有将写入请求发送给RegionServer,不会占用太多集群资源,效率极高。

代码实现:

import java.util

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.spark.{ByteArrayWrapper, FamiliesQualifiersValues, FamilyHFileWriteOptions, HBaseContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes


def handle(spark: SparkSession, df: DataFrame, key: String, source: String, stagingDir: String): Unit = {
	val sc = spark.sparkContext

	\//使用HBase资源创建一个Configuration
	val config = HBaseConfiguration.create()
	\//Zookeeper节点, 端口号
	config.set("hbase.zookeeper.quorum", "node-3,node-4,node-5")
	config.set("hbase.zookeeper.property.clientPort", "2181")

	\//初始化HbaseContext
	val hbaseContext = new HBaseContext(sc, config)

	\//DataFrame中的列名
	val columns = df.columns

	\//HBase表
	val tableName = TableName.valueOf("hbase_table_name")
	val conn = ConnectionFactory.createConnection(config)

	\//HBase column family
	val columnFamily: Array[Byte] = Bytes.toBytes("cf")

	\//HBase bulk load对于那些少于1000个列短row的Spark实现
	df.rdd.hbaseBulkLoadThinRows(
		hbaseContext,
		tableName, \//要加载进数据的表
		t => { \//一个要将RDD records转化成key value格式的函数
			\//rowKey
			val rowKey = t.getAs(key).toString

			\//保存并排序将要bulk load到单个row中的所有cells
			val familyQualifiersValues = new FamiliesQualifiersValues

			\//遍历所有列
			columns.foreach(c => {
				val qualifier = Bytes.toBytes(c) \//HBase column qualifier
				val value: Array[Byte] = Bytes.toBytes(t.getAs(c).toString) \//HBase cell value

				\//添加一个新的cell到一个已存在的row
				familyQualifiersValues.+=(columnFamily, qualifier, value)
			})

			\//添加字段(字段表示数据来源)
			familyQualifiersValues.+=(columnFamily, Bytes.toBytes("source"), Bytes.toBytes(source))

			(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
		},
		stagingDir, \//要load到文件系统的路径
		new util.HashMap[Array[Byte], FamilyHFileWriteOptions], \//一个选项, 可以设置列族写入HFile的方式(compression, bloomType, blockSize, dataBlockEncoding)
		compactionExclude = false \//Compaction excluded for the HFiles
	)

	\//加载HFileOutputFormat的输出到一个已存在的表
	val load = new LoadIncrementalHFiles(config)

	\//将指定目录下的数据load到指定的表
	load.doBulkLoad(new Path(stagingDir),
		conn.getAdmin,
		conn.getTable(tableName),
		conn.getRegionLocator(tableName)
	)
}
           

Spark应用程序依赖的jar包

spark-submit提交任务时需要依赖的第三方jar包:

fastjson-1.2.62.jar
hbase-protocol-1.2.0.jar
hbase-common-1.2.0.jar
hbase-spark-1.2.0-cdh5.16.2.jar
hbase-client-1.2.0.jar
hbase-server-1.2.0.jar
htrace-core-3.1.0-incubating.jar
           

继续阅读