天天看點

Spark入門--求中位數

資料如下:

1 2 3 4 5 6 8 9 11 12 13 15 18 20 22 23 25 27 29
           

代碼如下:

import org.apache.spark.{SparkConf, SparkContext}
import scala.util.control.Breaks._
/**
 * Created by xuyao on 15-7-24.
 * 求中位數,資料是分布式存儲的
 * 将整體的資料分為K個桶,統計每個桶内的資料量,然後統計整個資料量
 * 根據桶的數量和總的資料量,可以判斷資料落在哪個桶裡,以及中位數的偏移量
 * 取出這個中位數
 */
object Median {
   def main (args: Array[String]) {
    val conf =new SparkConf().setAppName("Median")
     val sc=new SparkContext(conf)
     //通過textFile讀入的是字元串型,是以要進行類型轉換
     val data =sc.textFile("data").flatMap(x=>x.split(' ')).map(x=>x.toInt)
     //将資料分為4組,當然我這裡的資料少
     val  mappeddata =data.map(x=>(x/,x)).sortByKey()
     //p_count為每個分組的個數
     val p_count =data.map(x=>(x/,)).reduceByKey(_+_).sortByKey()
     p_count.foreach(println)
     //p_count是一個RDD,不能進行Map集合操作,是以要通過collectAsMap方法将其轉換成scala的集合
     val scala_p_count=p_count.collectAsMap()
     //根據key值得到value值
     println(scala_p_count())
     //sum_count是統計總的個數,不能用count(),因為會得到多少個map對。
     val sum_count = p_count.map(x=>x._2).sum().toInt
     println(sum_count)
     var temp =//中值所在的區間累加的個數
     var temp2=//中值所在區間的前面所有的區間累加的個數
     var index=//中值的區間
     var mid= 
     if(sum_count%!=){
        mid =sum_count/+//中值在整個資料的偏移量
     }
     else{
        mid =sum_count/
     }
     val pcount=p_count.count()
     breakable{
       for(i <-  to pcount.toInt-){
         temp =temp + scala_p_count(i)
         temp2 =temp-scala_p_count(i)
         if(temp>=mid){
           index=i
           break
         }
       }
     }
     println(mid+" "+index+" "+temp+" "+temp2)
     //中位數在桶中的偏移量
     val offset =mid-temp2
     //takeOrdered它預設可以将key從小到大排序後,擷取rdd中的前n個元素
     val result =mappeddata.filter(x=>x._1==index).takeOrdered(offset)
     println(result(offset-)._2)
     sc.stop()
  }

}
           

運作結果如下:

/usr/lib/jvm/java-7-sun/bin/java -Dspark.master=local -Didea.launcher.port=7535 -Didea.launcher.bin.path=/opt/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-7-sun/jre/lib/jfr.jar:/usr/lib/jvm/java-7-sun/jre/lib/javaws.jar:/usr/lib/jvm/java-7-sun/jre/lib/resources.jar:/usr/lib/jvm/java-7-sun/jre/lib/plugin.jar:/usr/lib/jvm/java-7-sun/jre/lib/jfxrt.jar:/usr/lib/jvm/java-7-sun/jre/lib/jsse.jar:/usr/lib/jvm/java-7-sun/jre/lib/charsets.jar:/usr/lib/jvm/java-7-sun/jre/lib/deploy.jar:/usr/lib/jvm/java-7-sun/jre/lib/management-agent.jar:/usr/lib/jvm/java-7-sun/jre/lib/rt.jar:/usr/lib/jvm/java-7-sun/jre/lib/jce.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-7-sun/jre/lib/ext/localedata.jar:/opt/IdeaProjects/SparkTest/target/scala-2.10/classes:/home/xuyao/.sbt/boot/scala-2.10.4/lib/scala-library.jar:/home/xuyao/spark/lib/spark-assembly-1.4.0-hadoop2.4.0.jar:/opt/idea/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain Median
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/29 12:43:28 INFO SparkContext: Running Spark version 1.4.0
15/07/29 12:43:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
// :: WARN Utils: Your hostname, hadoop resolves to a loopback address: ; using 192.168.73.129 instead (on interface eth0)
15/07/29 12:43:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
// :: INFO SecurityManager: Changing view acls to: xuyao
// :: INFO SecurityManager: Changing modify acls to: xuyao
// :: INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xuyao); users with modify permissions: Set(xuyao)
// :: INFO Slf4jLogger: Slf4jLogger started
// :: INFO Remoting: Starting remoting
// :: INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:58364]
15/07/29 12:43:32 INFO Utils: Successfully started service 'sparkDriver' on port 58364.
15/07/29 12:43:32 INFO SparkEnv: Registering MapOutputTracker
15/07/29 12:43:33 INFO SparkEnv: Registering BlockManagerMaster
15/07/29 12:43:33 INFO DiskBlockManager: Created local directory at /tmp/spark-329d9ad9-4ed6-4a79-97f3-254cab1a13b8/blockmgr-f9da5521-a9c0-4801-bffb-3a92f089d1cd
15/07/29 12:43:33 INFO MemoryStore: MemoryStore started with capacity 131.6 MB
15/07/29 12:43:33 INFO HttpFileServer: HTTP File server directory is /tmp/spark-329d9ad9-4ed6-4a79-97f3-254cab1a13b8/httpd-fd2adba3-06b9-4035-9c2b-6733e379207a
15/07/29 12:43:33 INFO HttpServer: Starting HTTP Server
15/07/29 12:43:33 INFO Utils: Successfully started service 'HTTP file server' on port 58175.
15/07/29 12:43:33 INFO SparkEnv: Registering OutputCommitCoordinator
15/07/29 12:43:38 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/07/29 12:43:38 INFO SparkUI: Started SparkUI at http://192.168.73.129:4040
15/07/29 12:43:39 INFO Executor: Starting executor ID driver on host localhost
15/07/29 12:43:39 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56974.
15/07/29 12:43:39 INFO NettyBlockTransferService: Server created on 56974
15/07/29 12:43:39 INFO BlockManagerMaster: Trying to register BlockManager
15/07/29 12:43:39 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56974 with 131.6 MB RAM, BlockManagerId(driver, localhost, 56974)
15/07/29 12:43:39 INFO BlockManagerMaster: Registered BlockManager
15/07/29 12:43:40 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
15/07/29 12:43:41 INFO MemoryStore: ensureFreeSpace(137512) called with curMem=0, maxMem=137948037
15/07/29 12:43:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 134.3 KB, free 131.4 MB)
15/07/29 12:43:41 INFO MemoryStore: ensureFreeSpace(12633) called with curMem=137512, maxMem=137948037
15/07/29 12:43:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 131.4 MB)
15/07/29 12:43:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56974 (size: 12.3 KB, free: 131.5 MB)
15/07/29 12:43:41 INFO SparkContext: Created broadcast 0 from textFile at Median.scala:15
15/07/29 12:43:41 INFO FileInputFormat: Total input paths to process : 1
15/07/29 12:43:41 INFO SparkContext: Starting job: foreach at Median.scala:20
15/07/29 12:43:41 INFO DAGScheduler: Registering RDD 6 (map at Median.scala:19)
15/07/29 12:43:41 INFO DAGScheduler: Registering RDD 7 (reduceByKey at Median.scala:19)
15/07/29 12:43:41 INFO DAGScheduler: Got job 0 (foreach at Median.scala:20) with 1 output partitions (allowLocal=false)
15/07/29 12:43:41 INFO DAGScheduler: Final stage: ResultStage 2(foreach at Median.scala:20)
15/07/29 12:43:41 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
15/07/29 12:43:41 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
15/07/29 12:43:41 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[6] at map at Median.scala:19), which has no missing parents
15/07/29 12:43:41 INFO MemoryStore: ensureFreeSpace(4168) called with curMem=150145, maxMem=137948037
15/07/29 12:43:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 131.4 MB)
15/07/29 12:43:41 INFO MemoryStore: ensureFreeSpace(2376) called with curMem=154313, maxMem=137948037
15/07/29 12:43:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 131.4 MB)
15/07/29 12:43:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56974 (size: 2.3 KB, free: 131.5 MB)
15/07/29 12:43:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874
15/07/29 12:43:41 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[6] at map at Median.scala:19)
15/07/29 12:43:41 INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO HadoopRDD: Input split: file:/opt/IdeaProjects/SparkTest/data:+
// :: INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
// :: INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
// :: INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
// :: INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
// :: INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO DAGScheduler: ShuffleMapStage  (map at Median.scala:) finished in  s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ShuffleMapStage , ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO DAGScheduler: Missing parents for ShuffleMapStage : List()
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: Missing parents for ResultStage : List(ShuffleMapStage )
// :: INFO DAGScheduler: Submitting ShuffleMapStage  (ShuffledRDD[] at reduceByKey at Median.scala:), which is now runnable
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size  B, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost: (size:  B, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ShuffleMapStage  (ShuffledRDD[] at reduceByKey at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO DAGScheduler: ShuffleMapStage  (reduceByKey at Median.scala:) finished in  s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO DAGScheduler: Missing parents for ResultStage : List()
// :: INFO DAGScheduler: Submitting ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:), which is now runnable
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size  B, free  MB)
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost: (size:  B, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
(,)
(,)
(,)
(,)
(,)
(,)
(,)
(,)
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO DAGScheduler: ResultStage  (foreach at Median.scala:) finished in  s
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: Job  finished: foreach at Median.scala:, took  s
// :: INFO SparkContext: Starting job: collectAsMap at Median.scala:
// :: INFO MapOutputTrackerMaster: Size of output statuses for shuffle  is  bytes
// :: INFO MapOutputTrackerMaster: Size of output statuses for shuffle  is  bytes
// :: INFO DAGScheduler: Got job  (collectAsMap at Median.scala:) with  output partitions (allowLocal=false)
// :: INFO DAGScheduler: Final stage: ResultStage (collectAsMap at Median.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:), which has no missing parents
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size  B, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost: (size:  B, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO DAGScheduler: ResultStage  (collectAsMap at Median.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: collectAsMap at Median.scala:, took  s
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 

// :: INFO SparkContext: Starting job: sum at Median.scala:
// :: INFO DAGScheduler: Got job  (sum at Median.scala:) with  output partitions (allowLocal=false)
// :: INFO DAGScheduler: Final stage: ResultStage (sum at Median.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage  (MapPartitionsRDD[] at numericRDDToDoubleRDDFunctions at Median.scala:), which has no missing parents
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size  B, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost: (size:  B, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (MapPartitionsRDD[] at numericRDDToDoubleRDDFunctions at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ResultStage  (sum at Median.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: sum at Median.scala:, took  s
// :: INFO SparkContext: Starting job: count at Median.scala:

// :: INFO DAGScheduler: Got job  (count at Median.scala:) with  output partitions (allowLocal=false)
// :: INFO DAGScheduler: Final stage: ResultStage (count at Median.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:), which has no missing parents
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size  B, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost: (size:  B, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (ShuffledRDD[] at sortByKey at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
// :: INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost: in memory (size:  B, free:  MB)
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ResultStage  (count at Median.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: count at Median.scala:, took  s
   
// :: INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost: in memory (size:  B, free:  MB)
// :: INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost: in memory (size:  B, free:  MB)
// :: INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost: in memory (size:  B, free:  MB)
// :: INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost: in memory (size:  KB, free:  MB)
// :: INFO SparkContext: Starting job: takeOrdered at Median.scala:
// :: INFO DAGScheduler: Registering RDD  (map at Median.scala:)
// :: INFO DAGScheduler: Got job  (takeOrdered at Median.scala:) with  output partitions (allowLocal=false)
// :: INFO DAGScheduler: Final stage: ResultStage (takeOrdered at Median.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List(ShuffleMapStage )
// :: INFO DAGScheduler: Submitting ShuffleMapStage  (MapPartitionsRDD[] at map at Median.scala:), which has no missing parents
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size  KB, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost: (size:  KB, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ShuffleMapStage  (MapPartitionsRDD[] at map at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO HadoopRDD: Input split: file:/opt/IdeaProjects/SparkTest/data:+
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ShuffleMapStage  (map at Median.scala:) finished in  s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO DAGScheduler: Missing parents for ResultStage : List()
// :: INFO DAGScheduler: Submitting ResultStage  (MapPartitionsRDD[] at takeOrdered at Median.scala:), which is now runnable
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: ensureFreeSpace() called with curMem=, maxMem=
// :: INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size  KB, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost: (size:  KB, free:  MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (MapPartitionsRDD[] at takeOrdered at Median.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , localhost, PROCESS_LOCAL,  bytes)
// :: INFO Executor: Running task  in stage  (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting  non-empty blocks out of  blocks
// :: INFO ShuffleBlockFetcherIterator: Started  remote fetches in  ms
// :: INFO Executor: Finished task  in stage  (TID ).  bytes result sent to driver
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ResultStage  (takeOrdered at Median.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: takeOrdered at Median.scala:, took  s

// :: INFO SparkUI: Stopped Spark web UI at http://:
// :: INFO DAGScheduler: Stopping DAGScheduler
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO Utils: path = /tmp/spark-d9ad9-ed6-a79-f3-cab1a13b8/blockmgr-f9da5521-a9c0--bffb-a92f089d1cd, already present as root for deletion.
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
// :: INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/29 12:43:43 INFO Utils: Shutdown hook called
15/07/29 12:43:43 INFO Utils: Deleting directory /tmp/spark-329d9ad9-4ed6-4a79-97f3-254cab1a13b8

Process finished with exit code 0
           

繼續閱讀