天天看點

spark讀取hbase中的多張表,通過scan方式來篩選出想要的資訊

直接上代碼

package com.tophant

import java.text.SimpleDateFormat

import com.tophant.html.Util_html
import com.tophant.util.Util
import org.apache.commons.lang.{StringEscapeUtils, StringUtils}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object Hbase2hdfs {
  def main(args: Array[String]): Unit = {
    val logger = LoggerFactory.getLogger(getClass)
    // 指令行參數個數必須為2
    if (args.length != 5) {
      logger.error("參數個數錯誤")
      logger.error("Usage: Classify <開始日期> <結束日期> <存儲位置> <host>")
      System.exit(1)
    }

    // 擷取指令行參數中的行為資料起止日期
    val tableName = args(0).trim  //hbase表的字首
    val startDate = args(1).trim  //hbase表的字尾起始日期
    val endDate   = args(2).trim  //hbase表的字尾截至日期
    val path = args(3).trim
    val host = args(4).trim
    // 根據起止日志擷取日期清單
    // 例如起止時間為20160118,20160120,那麼日期清單為(20160118,20160119,20160120)
    val dateSet = getDateSet(startDate, endDate)

    val sparkConf = new SparkConf()
      .setAppName("hbase2hdfs")
    //      .set("spark.default.parallelism","12")
    val sc = new SparkContext(sparkConf)

    val conf = HBaseConfiguration.create()
    //設定zooKeeper叢集位址,也可以通過将hbase-site.xml導入classpath,但是建議在程式裡這樣設定
    conf.set("hbase.zookeeper.quorum", "10.0.71.501,10.0.71.502,10.0.71.503")
    //設定zookeeper連接配接端口,預設2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    //組裝scan語句
    val scan=new Scan()
    scan.setCacheBlocks(false)
    scan.addFamily(Bytes.toBytes("rd"))
    scan.addColumn(Bytes.toBytes("rd"), Bytes.toBytes("host"))
    scan.addColumn("rd".getBytes, "url".getBytes)
    scan.addColumn("rd".getBytes, "http.request.method".getBytes)
    scan.addColumn("rd".getBytes, "http.request.header".getBytes)


    val hostFilter = new SingleColumnValueFilter(Bytes.toBytes("rd"),
      Bytes.toBytes("host"),
      CompareOp.EQUAL,
      Bytes.toBytes(host))
    scan.setFilter(hostFilter)
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    val proto = ProtobufUtil.toScan(scan)
    val ScanToString = Base64.encodeBytes(proto.toByteArray)


    // 按照日期清單讀出多個RDD存在一個Set中,再用SparkContext.union()合并成一個RDD
    var rddSet: Set[RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] ] = Set()
    dateSet.foreach(date => {
      conf.set(TableInputFormat.SCAN, ScanToString)
      conf.set(TableInputFormat.INPUT_TABLE, tableName + date) // 設定表名
      val bRdd: RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] =
        sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
      rddSet += bRdd
    })

    val hBaseRDD = sc.union(rddSet.toSeq)

    val kvRDD = hBaseRDD.
      map { case (_, result) =>
        //通過列族和列名擷取列
        var url = Bytes.toString(result.getValue("rd".getBytes, "url".getBytes))
        val host = Bytes.toString(result.getValue("rd".getBytes, "host".getBytes))
        var httpRequestHeaders = Bytes.toString(result.getValue("rd".getBytes, "http.request.header".getBytes))
        var httpResponseHeaders = Bytes.toString(result.getValue("rd".getBytes, "http.response.header".getBytes))

  

        val value = (url + "$$$" + httpRequestHeaders + "$$$" + httpResponseHeaders )

        ( host, value)
      }
    .saveAsTextFile(path)
  }

  def getDateSet(sDate:String, eDate:String): Set[String] = {
    // 定義要生成的日期清單
    var dateSet: Set[String] = Set()

    // 定義日期格式
    val sdf = new SimpleDateFormat("yyyyMMdd")

    // 按照上邊定義的日期格式将起止時間轉化成毫秒數
    val sDate_ms = sdf.parse(sDate).getTime
    val eDate_ms = sdf.parse(eDate).getTime

    // 計算一天的毫秒數用于後續疊代
    val day_ms = 24*60*60*1000

    // 循環生成日期清單
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format(tm)
      dateSet += dateStr
      tm = tm + day_ms
    }

    // 日期清單作為傳回
    dateSet
  }
}
           

繼續閱讀