本文轉自:https://www.jianshu.com/p/dcfc0b6ae0ea
本站轉載已經過作者授權。任何形式的轉載都請聯系原作者(
薛定谔的貓Plus)獲得授權并注明出處。
在Struct Streaming中增加了支援sql處理流資料,在sql包中單獨處理,其中StreamExecution是下面提到兩處流處理的基類,這個流查詢在資料源有新資料到達時會生成一個QueryExecution來執行并将結果輸出到指定的Sink(處理後資料存放地)中。
MicroBatchExecution
該部分是小批量處理,預設使用ProcessingTimeExecutor這個trigger定時出發,使用的是系統時鐘.
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
extends TriggerExecutor with Logging {
private val intervalMs = processingTime.intervalMs
require(intervalMs >= 0)
override def execute(triggerHandler: () => Boolean): Unit = {
while (true) {
val triggerTimeMs = clock.getTimeMillis
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
val terminated = !triggerHandler()
if (intervalMs > 0) {
val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
if (batchElapsedTimeMs > intervalMs) {
notifyBatchFallingBehind(batchElapsedTimeMs)
}
if (terminated) {
return
}
clock.waitTillTime(nextTriggerTimeMs)
} else {
if (terminated) {
return
}
}
}
}
該執行邏輯是首先生成一個邏輯計劃,标記是從什麼資料源抽取資料
override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
// We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
// map as we go to ensure each identical relation gets the same StreamingExecutionRelation
// object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
// plan for the data within that batch.
// Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
// since the existing logical plan has already used those attributes. The per-microbatch
// transformation is responsible for replacing attributes with their final values.
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
StreamingExecutionRelation(source, output)(sparkSession)
})
case s@StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val reader = source.createMicroBatchReader(
Optional.empty(), // user specified schema
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
StreamingExecutionRelation(reader, output)(sparkSession)
})
case s@StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
if (v1Relation.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $sourceName does not support microbatch processing.")
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
StreamingExecutionRelation(source, output)(sparkSession)
})
}
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
uniqueSources = sources.distinct
_logicalPlan
}
以kafka為例,在執行過程中建構kafka的offset範圍,在populateStartOffsets以及constructNextBatch這兩個方法中完成kafka的offset範圍,接下來在runBatch中完成資料資料抽取.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source: Source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
//這部分邏輯基于傳入的起始offset範圍(包含了每個partition的offset範圍)形成一個kafka的DataFrame
val batch = source.getBatch(current, available)
基于該部分生成的DataFrame,替換最開始logicPlan中的資料源
val newBatchesPlan = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { dataPlan =>
assert(output.size == dataPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
s"${Utils.truncatedString(dataPlan.output, ",")}")
replacements ++= output.zip(dataPlan.output)
dataPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
}
後續基于此邏輯計劃new一個IncrementalExecution形成執行計劃
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
runId,
currentBatchId,
offsetSeqMetadata)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
val nextBatch =
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
接下來基于不同的sink進行處理,其中SQLExecution.withNewExecutionId主要是為了跟蹤jobs的資訊
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink =>
if (s.isInstanceOf[MemorySinkExtend]) {
s.addBatch(currentBatchId, nextBatch, batchIdOffsetMap.get(currentBatchId).getOrElse((None, None)))
} else {
s.addBatch(currentBatchId, nextBatch, (None, None))
}
case _: StreamWriteSupport =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
}
}
其中遺留一個問題是在計算過程中水印(watermark)的處理如何,我們繼續分析。
在執行過程中會随着資料中的事件時更新watermark時間
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec => e
}.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = watermarkMsMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
watermarkMsMap.put(index, newWatermarkMs)
}
在随後執行階段,基于該watermark生成表達式,然後在輸出資料時進行過濾
//statefulOperators.scala
lazy val watermarkExpression: Option[Expression] = {
WatermarkSupport.watermarkExpression(
child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
eventTimeWatermark)
}
/** Predicate based on keys that matches data older than the watermark */
lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
Some(newPredicate(e, keyExpressions))
} else {
None
}
}
/** Predicate based on the child output that matches data older than the watermark. */
lazy val watermarkPredicateForData: Option[Predicate] =
watermarkExpression.map(newPredicate(_, child.output))
在輸出階段,根據輸出模式不同,根據watermark時間從HDFSBackedStateStoreProvider中過濾聚合後的資料,以及删除存儲的一些聚合資料
ContinusExecution
該執行邏輯與上面類似,隻是這部分在儲存offset資訊是異步方式,流中的資料一直在處理。