直接上代碼
package com.tophant
import java.text.SimpleDateFormat
import com.tophant.html.Util_html
import com.tophant.util.Util
import org.apache.commons.lang.{StringEscapeUtils, StringUtils}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object Hbase2hdfs {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(getClass)
// 指令行參數個數必須為2
if (args.length != 5) {
logger.error("參數個數錯誤")
logger.error("Usage: Classify <開始日期> <結束日期> <存儲位置> <host>")
System.exit(1)
}
// 擷取指令行參數中的行為資料起止日期
val tableName = args(0).trim //hbase表的字首
val startDate = args(1).trim //hbase表的字尾起始日期
val endDate = args(2).trim //hbase表的字尾截至日期
val path = args(3).trim
val host = args(4).trim
// 根據起止日志擷取日期清單
// 例如起止時間為20160118,20160120,那麼日期清單為(20160118,20160119,20160120)
val dateSet = getDateSet(startDate, endDate)
val sparkConf = new SparkConf()
.setAppName("hbase2hdfs")
// .set("spark.default.parallelism","12")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
//設定zooKeeper叢集位址,也可以通過将hbase-site.xml導入classpath,但是建議在程式裡這樣設定
conf.set("hbase.zookeeper.quorum", "10.0.71.501,10.0.71.502,10.0.71.503")
//設定zookeeper連接配接端口,預設2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
//組裝scan語句
val scan=new Scan()
scan.setCacheBlocks(false)
scan.addFamily(Bytes.toBytes("rd"))
scan.addColumn(Bytes.toBytes("rd"), Bytes.toBytes("host"))
scan.addColumn("rd".getBytes, "url".getBytes)
scan.addColumn("rd".getBytes, "http.request.method".getBytes)
scan.addColumn("rd".getBytes, "http.request.header".getBytes)
val hostFilter = new SingleColumnValueFilter(Bytes.toBytes("rd"),
Bytes.toBytes("host"),
CompareOp.EQUAL,
Bytes.toBytes(host))
scan.setFilter(hostFilter)
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
val proto = ProtobufUtil.toScan(scan)
val ScanToString = Base64.encodeBytes(proto.toByteArray)
// 按照日期清單讀出多個RDD存在一個Set中,再用SparkContext.union()合并成一個RDD
var rddSet: Set[RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] ] = Set()
dateSet.foreach(date => {
conf.set(TableInputFormat.SCAN, ScanToString)
conf.set(TableInputFormat.INPUT_TABLE, tableName + date) // 設定表名
val bRdd: RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] =
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
rddSet += bRdd
})
val hBaseRDD = sc.union(rddSet.toSeq)
val kvRDD = hBaseRDD.
map { case (_, result) =>
//通過列族和列名擷取列
var url = Bytes.toString(result.getValue("rd".getBytes, "url".getBytes))
val host = Bytes.toString(result.getValue("rd".getBytes, "host".getBytes))
var httpRequestHeaders = Bytes.toString(result.getValue("rd".getBytes, "http.request.header".getBytes))
var httpResponseHeaders = Bytes.toString(result.getValue("rd".getBytes, "http.response.header".getBytes))
val value = (url + "$$$" + httpRequestHeaders + "$$$" + httpResponseHeaders )
( host, value)
}
.saveAsTextFile(path)
}
def getDateSet(sDate:String, eDate:String): Set[String] = {
// 定義要生成的日期清單
var dateSet: Set[String] = Set()
// 定義日期格式
val sdf = new SimpleDateFormat("yyyyMMdd")
// 按照上邊定義的日期格式将起止時間轉化成毫秒數
val sDate_ms = sdf.parse(sDate).getTime
val eDate_ms = sdf.parse(eDate).getTime
// 計算一天的毫秒數用于後續疊代
val day_ms = 24*60*60*1000
// 循環生成日期清單
var tm = sDate_ms
while (tm <= eDate_ms) {
val dateStr = sdf.format(tm)
dateSet += dateStr
tm = tm + day_ms
}
// 日期清單作為傳回
dateSet
}
}