天天看点

spark源码action系列-saveAsHadoopDataset

RDD.saveAsHadoopDataset

这个功能是spark中的saveAsTextFile,saveASHadoopFile的基础实现.

这个action用于把task中的数据通过指定的output format写入到hadoop的实现接口中,

由PairRDDFunctions类进行实现.

执行前的准备:

得到hadoopConfiguration的实例,取出OutputFormat的实现类,key,value的类.

val hadoopConf = conf

val outputFormatInstance = hadoopConf.getOutputFormat

val keyClass = hadoopConf.getOutputKeyClass

val valueClass = hadoopConf.getOutputValueClass

.....................................

SparkHadoopUtil.get.addCredentials(hadoopConf)

logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +

  valueClass.getSimpleName + ")")

if (isOutputSpecValidationEnabled) {

  // FileOutputFormat ignores the filesystem parameter

  val ignoredFs = FileSystem.get(hadoopConf)

  hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)

}

生成一个用于执行写入操作的writer实例,这个实例中包含有向hadoop写入的recordWriter.

在这里recordWriter实例还没有具体的生成.

val writer = new SparkHadoopWriter(hadoopConf)

writer.preSetup()

Task的执行的function的定义:

这里生成的writeToFile是一个需要在task中执行的function的定义.

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {

1,先得到执行尝试的id.

  // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than 

       Int.MaxValue, roll it

  // around by taking a mod. We expect that no task will be attempted 2 billion times.

  val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

2,生成一个用于写入的metrics的实例.

  val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

3,根据stageId,partition,attempt对writer进行初始化,主要是对task对应的jobId,taskid进行初始化操作.

  writer.setup(context.stageId, context.partitionId, taskAttemptId)

4,这里(如果是对hdfs文件进行操作,生成对应此task的输出文件的路径),通过outputFormat生成对应的RecordWriter的实例.

  writer.open()

  var recordsWritten = 0L

  Utils.tryWithSafeFinally {

5,这里通过对partition中的数据进行迭代,并通过recordWriter写入数据,记录metrics

    while (iter.hasNext) {

      val record = iter.next()

      writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

      // Update bytes written metric every few records

      maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)

      recordsWritten += 1

    }

  } {

执行对iterator的写入操作后,关闭recordWriter的实例,也就是调用其关闭函数.

    writer.close()

  }

根据mapred.output.committer.class配置的OutputCommitter实现,执行commitTask操作.

  writer.commit()

  bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }

  outputMetrics.setRecordsWritten(recordsWritten)

}

这里通过runJob并把上面生成的函数传进去,执行task

self.context.runJob(self, writeToFile)

任务结束后的处理:

根据mapred.output.committer.class配置的OutputCommitter实现,执行commitJob操作.

writer.commitJob()