天天看點

Scala pair RDD 統計均值,方差等

1.求統計數值 V1.0

val conf = new SparkConf().setAppName("hh")
    conf.setMaster("local[3]")
    val sc = new SparkContext(conf)

val data = sc.textFile("/home/hadoop4/Desktop/i.txt")
  .map(_.split("\t")).map(f => f.map(f => f.toDouble))
  .map(f => ("k"+f(),f()))

//  variance
//data:RDD[(String,Double)]
val dataArr = data.map(f=>(f._1,ArrayBuffer(f._2)))
//dataArr  RDD[(String,ArrayBuffer[Double])]

dataArr.collect().foreach(println(_))
//output
(k1,ArrayBuffer())
(k1,ArrayBuffer())
(k4,ArrayBuffer())
(k4,ArrayBuffer())
(k7,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k1,ArrayBuffer())
(k10,ArrayBuffer())
(k10,ArrayBuffer())
(k1,ArrayBuffer())
(k4,ArrayBuffer())


val dataArrRed = dataArr.reduceByKey((x,y)=>x++=y)
//dataArrRed :RDD[(String,ArrayBuffer[Double])]
dataArrRed.collect().foreach(println(_))
//output
(k1,ArrayBuffer(, , , ))
(k7,ArrayBuffer())
(k10,ArrayBuffer(, , , , ))
(k4,ArrayBuffer(, , ))

val dataARM = dataArrRed.collect().map(
f=>(f._1,sc.makeRDD(f._2,)))
val dataARMM = dataARM.map(
f=>(f._1,(f._2.variance(),f._2.max(),f._2.min())))
.foreach(println(_))
sc.stop()

//output
(k1,(,,))
(k7,(,,))
(k10,(,,))
(k4,(,,))
           

由于在reduceByKey後要先collect,driver.maxResultSize不夠大的話會報錯,driver.maxResultSize預設設定為1g,在資料量大時會報錯,我是把

driver.maxResultSize

設定為了10g.

2. Spark 同時計算 多列的統計值

這方法還是有問題,如何避免collect??

解決方法,筆者在StackOverFlow問的問題:how to avoid collect when using statistic stat

data:
      
      
      
      
      
    
    
     
    
    
    
      
      

val dataString = xx.trim //trim去除首尾的空格
val dataArray = dataString.split("\\n")
                .map(_.split("\\s+")) //\\s表示空格,回車,換行等空白符,+号表示一個或多個的意思
                .map(_.map((_.toDouble)))
                .map(f=>("k"+f(),f()))
val data = sc.parallelize(dataArray)
//data:RDD[(String,Double)]

val dataStats = data.aggregateByKey(new StatCounter()
)({(s,v)=>s.merge(v)},{(s,t)=>s.merge(t)})

//or, slightly shorter but perhaps over-tricky:
val dataStats = data.aggregateByKey(new StatCounter())(_ merge _ , _ merge _)
//dataStats:RDD[(String, Statcounter)]

//Re-format to the OP's format and print 
val result = dataStats.map(f=>(f._1,(f._2.variance, f._2.max, f._2.min)))
.foreach(println(_))
//result=dataStats.map(...)  :RDD[(Stirng,(Double,Double,Double))]


//output
(k1,(,,))
(k7,(,,))
(k10,(,,))
(k4,(,,))
           

Version with two columns:

val dataArray = dataString.split("\\n")
                .map(_.split("\\s+")).map(_.map(_.toDouble))
                .map(f=>("k"+f(),Array(f(),f())))
val data = sc.parallelize(dataArray)
//data:RDD[(String,Array[Double])]

val dataStats = data.aggregateByKey(
Array(new StatCounter(), new StatCounter())
)({(s,v)=>Array(s() merge v(),s() merge v())},
  {(s,t)=>Array(s() merge t(), s() merge t())})
//dataStats:RDD[(String, Array[StatCounter])]

val result = dataStats.map(f=>(f._1,
                        (f._2().variance,f._2().max,f._2().min),
                        (f._2().variance,f._2().max,f._2().min)
                        ))).foreach(println(_))
//result=dataStats.map(...)  :RDD[(Stirng,(Double,Double,Double),(Double,Double,Double))]

//output
(k1,(,,),(,,))
(k7,(,,),(,,))
(k10,(,,),(,,))
(k4,(,,),(,,))

           

‘n’ column version:

val n = 
val dataStats = data.aggregateByKey(List.fill(n)(new Counter()))(
{(s,v)=>(s zip v).map{case (si, vi) => si merge vi}},
{(s,t)=>(s zip t).map{case (si, ti) => si merge ti}})

val result = dataStats.map(f=>(f._1, f._2.map(x=>(x.variance, x.max, x.min))))
    .foreach(println(_))
           

Output same as above, but you have more columns, you can change

n

, it will break if the Arrays in any row less than n elements.

求統計數值V2.0 均值函數

def mosMean(data:RDD[(String,Double)]): RDD[(String,Double)] ={
    val dataStats = data.aggregateByKey(new StatCounter()
    )(_ merge _,_ merge _)
    val result = dataStats.map(f=>(f._1,f._2.mean))
    result
  }
           

将結果插入HBase

def saveToHbase(result:RDD[(String,Double)],tablename:String,column:String,item:String*)={
 result.foreachPartition { x =>
   val myConf = HBaseConfiguration.create()
   myConf.set("hbase.zookeeper.quorum",
     "compute000,compute001,compute002,compute003,compute004," +
       "compute005,compute006,compute007,compute008,compute009,compute010," +
       "compute011,compute012,compute013,compute014,compute015,compute016," +
       "compute017,compute018,compute019,compute020,compute021,compute022," +
       "compute023,compute024,compute025,compute026,compute027,compute028," +
       "compute029,compute030,compute031,compute032,compute033,compute034," +
       "compute035,compute036,compute037,compute038")
   myConf.set("hbase.master", "10.10.10.10:60000")
   myConf.set("hbase.zookeeper.property.clientPort", "2181")
   myConf.set("hbase.defaults.for.version.skip", "true")
   val myTable = new HTable(myConf, tablename)
   myTable.setAutoFlush(false, false)
   myTable.setWriteBufferSize( *  * )

   x.foreach { f => 
     val p = new Put(Bytes.toBytes(f._1))
     for (k <-  until item.length) {
       p.addColumn(Bytes.toBytes(column), Bytes.toBytes(item(k)), Bytes.toBytes(f._2.toString))
     }
     myTable.put(p)
   }
   myTable.flushCommits()
 }
}
           

繼續閱讀