1.求統計數值 V1.0
val conf = new SparkConf().setAppName("hh")
conf.setMaster("local[3]")
val sc = new SparkContext(conf)
val data = sc.textFile("/home/hadoop4/Desktop/i.txt")
.map(_.split("\t")).map(f => f.map(f => f.toDouble))
.map(f => ("k"+f(),f()))
// variance
//data:RDD[(String,Double)]
val dataArr = data.map(f=>(f._1,ArrayBuffer(f._2)))
//dataArr RDD[(String,ArrayBuffer[Double])]
dataArr.collect().foreach(println(_))
//output
(k1,ArrayBuffer())
(k1,ArrayBuffer())
(k4,ArrayBuffer())
(k4,ArrayBuffer())
(k7,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k1,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k1,ArrayBuffer())
(k4,ArrayBuffer())
val dataArrRed = dataArr.reduceByKey((x,y)=>x++=y)
//dataArrRed :RDD[(String,ArrayBuffer[Double])]
dataArrRed.collect().foreach(println(_))
//output
(k1,ArrayBuffer(, , , ))
(k7,ArrayBuffer())
(k10,ArrayBuffer(, , , , ))
(k4,ArrayBuffer(, , ))
val dataARM = dataArrRed.collect().map(
f=>(f._1,sc.makeRDD(f._2,)))
val dataARMM = dataARM.map(
f=>(f._1,(f._2.variance(),f._2.max(),f._2.min())))
.foreach(println(_))
sc.stop()
//output
(k1,(,,))
(k7,(,,))
(k10,(,,))
(k4,(,,))
由于在reduceByKey後要先collect,driver.maxResultSize不夠大的話會報錯,driver.maxResultSize預設設定為1g,在資料量大時會報錯,我是把
driver.maxResultSize
設定為了10g.
2. Spark 同時計算 多列的統計值
這方法還是有問題,如何避免collect??
解決方法,筆者在StackOverFlow問的問題:how to avoid collect when using statistic stat
data:
val dataString = xx.trim //trim去除首尾的空格
val dataArray = dataString.split("\\n")
.map(_.split("\\s+")) //\\s表示空格,回車,換行等空白符,+号表示一個或多個的意思
.map(_.map((_.toDouble)))
.map(f=>("k"+f(),f()))
val data = sc.parallelize(dataArray)
//data:RDD[(String,Double)]
val dataStats = data.aggregateByKey(new StatCounter()
)({(s,v)=>s.merge(v)},{(s,t)=>s.merge(t)})
//or, slightly shorter but perhaps over-tricky:
val dataStats = data.aggregateByKey(new StatCounter())(_ merge _ , _ merge _)
//dataStats:RDD[(String, Statcounter)]
//Re-format to the OP's format and print
val result = dataStats.map(f=>(f._1,(f._2.variance, f._2.max, f._2.min)))
.foreach(println(_))
//result=dataStats.map(...) :RDD[(Stirng,(Double,Double,Double))]
//output
(k1,(,,))
(k7,(,,))
(k10,(,,))
(k4,(,,))
Version with two columns:
val dataArray = dataString.split("\\n")
.map(_.split("\\s+")).map(_.map(_.toDouble))
.map(f=>("k"+f(),Array(f(),f())))
val data = sc.parallelize(dataArray)
//data:RDD[(String,Array[Double])]
val dataStats = data.aggregateByKey(
Array(new StatCounter(), new StatCounter())
)({(s,v)=>Array(s() merge v(),s() merge v())},
{(s,t)=>Array(s() merge t(), s() merge t())})
//dataStats:RDD[(String, Array[StatCounter])]
val result = dataStats.map(f=>(f._1,
(f._2().variance,f._2().max,f._2().min),
(f._2().variance,f._2().max,f._2().min)
))).foreach(println(_))
//result=dataStats.map(...) :RDD[(Stirng,(Double,Double,Double),(Double,Double,Double))]
//output
(k1,(,,),(,,))
(k7,(,,),(,,))
(k10,(,,),(,,))
(k4,(,,),(,,))
‘n’ column version:
val n =
val dataStats = data.aggregateByKey(List.fill(n)(new Counter()))(
{(s,v)=>(s zip v).map{case (si, vi) => si merge vi}},
{(s,t)=>(s zip t).map{case (si, ti) => si merge ti}})
val result = dataStats.map(f=>(f._1, f._2.map(x=>(x.variance, x.max, x.min))))
.foreach(println(_))
Output same as above, but you have more columns, you can change
n
, it will break if the Arrays in any row less than n elements.
求統計數值V2.0 均值函數
def mosMean(data:RDD[(String,Double)]): RDD[(String,Double)] ={
val dataStats = data.aggregateByKey(new StatCounter()
)(_ merge _,_ merge _)
val result = dataStats.map(f=>(f._1,f._2.mean))
result
}
将結果插入HBase
def saveToHbase(result:RDD[(String,Double)],tablename:String,column:String,item:String*)={
result.foreachPartition { x =>
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum",
"compute000,compute001,compute002,compute003,compute004," +
"compute005,compute006,compute007,compute008,compute009,compute010," +
"compute011,compute012,compute013,compute014,compute015,compute016," +
"compute017,compute018,compute019,compute020,compute021,compute022," +
"compute023,compute024,compute025,compute026,compute027,compute028," +
"compute029,compute030,compute031,compute032,compute033,compute034," +
"compute035,compute036,compute037,compute038")
myConf.set("hbase.master", "10.10.10.10:60000")
myConf.set("hbase.zookeeper.property.clientPort", "2181")
myConf.set("hbase.defaults.for.version.skip", "true")
val myTable = new HTable(myConf, tablename)
myTable.setAutoFlush(false, false)
myTable.setWriteBufferSize( * * )
x.foreach { f =>
val p = new Put(Bytes.toBytes(f._1))
for (k <- until item.length) {
p.addColumn(Bytes.toBytes(column), Bytes.toBytes(item(k)), Bytes.toBytes(f._2.toString))
}
myTable.put(p)
}
myTable.flushCommits()
}
}