天天看點

spark implementation hadoop setup,cleanup

def main(args: Array[String]) {
    val sc = new SparkContext("local", "xxx")
    val inputData = sc.textFile("hdfs://master:8020/data/spark/user-history-data")
    val lines = inputData.map(line => (line, line.length))

    val result = lines.mapPartitions { valueIterator =>
      if (valueIterator.isEmpty) {
        Iterator[String]()
      } else {
        val transformedItem = new ListBuffer[String]() //setup ListBuffer
        val fs: FileSystem = FileSystem.get(new Configuration()) //setup FileSystem

        valueIterator.map { item =>
          transformedItem += item._1 +":"+item._2
          val outputFile = fs.create(new Path("/home/xxx/opt/data/spark/" + item._1.substring(,item._1.indexOf("\t")) + ".txt"))
          outputFile.write((item._1 +":"+item._2).getBytes())
          if (!valueIterator.hasNext) {
            transformedItem.clear() //cleanup transformedItem
            outputFile.close() //cleanup outputFile
            fs.close() //cleanup fs
          }
          transformedItem
        }
      }
    }

    result.foreach(println(_))
    sc.stop()
           

将hdfs資料:

zhangsan 1 2015-07-30 20:01:01 127.0.0.1

zhangsan 2 2015-07-30 20:01:01 127.0.0.1

zhangsan 3 2015-07-30 20:01:01 127.0.0.1

zhangsan 4 2015-07-31 20:01:01 127.0.0.1

zhangsan 5 2015-07-31 20:21:01 127.0.0.1

lisi 1 2015-07-30 21:01:01 127.0.0.1

lisi 2 2015-07-30 22:01:01 127.0.0.1

lisi 3 2015-07-31 23:31:01 127.0.0.1

lisi 4 2015-07-31 22:21:01 127.0.0.1

lisi 5 2015-07-31 23:11:01 127.0.0.1

wangwu 1 2015-07-30 21:01:01 127.0.0.1

wangwu 2 2015-07-30 22:01:01 127.0.0.1

wangwu 3 2015-07-31 23:31:01 127.0.0.1

wangwu 4 2015-07-31 22:21:01 127.0.0.1

wangwu 5 2015-07-31 23:11:01 127.0.0.1

讀取到spark中,并統計每行長度,再将資料寫到本地的檔案中(檔案名稱以每行第一個單詞)

最終實作hadoop中setup, cleanup

強烈閱讀如下連結:

http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%[email protected].com%3E

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983

http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark

繼續閱讀