目錄:
1、有向無環圖
2、代碼結構
3、代碼學習步鄹及方法
4、重點代碼講解
5、代碼展現
6、運作結果
———————————————————————————————————
1、有向無環圖
在圖論中,如果一個有向圖無法從某個頂點出發經過若幹條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。
因為有向圖中一個點經過兩種路線到達另一個點未必形成環,是以有向無環圖未必能轉化成樹,但任何有向樹均為有向無環圖。
性質:有向無環圖的生成樹個數等于入度非零的節點的入度積。

2、代碼結構
3、代碼學習步鄹及方法
1、本文中涉及到了Spark的Dag和設計模式中的指令
2、Dag學習步鄹:task –> Node –> DAG –> DAGExecutor
3、設計模式指令 http://www.voidcn.com/article/p-fxkbitkn-ov.html
4、圖解Dag類的學習步鄹
4、重點代碼講解
下面這段代碼是核心也是最難的,如何找到父節點
//判斷Node的task節點的父節點運作狀态(flase ,true)
private def getPending: Option[T] = {
_pending.find { name =>
val parents = _nodes(name)
!parents.exists(name => !_success.contains(name))
}
}
1、nodes沒有父節點時,!parents.exists() 為true
2、parents.exists() 為flase時,!parents.exists() 為true
5、代碼展現
DAG.scala
package com.yh.dag
import java.time.{Duration, LocalDate}
import com.yh.nodeexecutor._
import org.slf4j.LoggerFactory
import scala.collection.immutable.{ListMap, ListSet}
case class Node[T](task: T, parent: T*) {
override def toString: String = {
s"$task(${parent.mkString(",")})"
}
}
case class DAG[T](nodes: Node[T]*)
case class DAGExecutor[T](dag: DAG[T]) {
private val LOG = LoggerFactory.getLogger(this.getClass)
private val _nodes: Map[T, Seq[T]] = dag.nodes.map(node => (node.task, node.parent.filter(_ != null))).toMap
private var _pending: Set[T] = ListSet()
private var _fails = ListMap[T, String]()
private var _success = Seq[T]()
//判斷Node的task節點的父節點運作狀态(flase ,true)
private def getPending: Option[T] = {
_pending.find { name =>
val parents = _nodes(name)
!parents.exists(name => !_success.contains(name))
}
}
private def fail(name: T, message: String): Unit = {
_pending -= name
_fails += name -> message
for (child _nodes(child).contains(name))) {
fail(child, s"依賴的任務無法執行: $name")
}
}
private def success(name: T): Unit = {
_pending -= name
_success = _success :+ name
}
def execute(func: T => Unit): Unit = {
_pending = _nodes.keySet
_fails = ListMap()
_success = Seq()
var running = true
while (running) {
val taskOpt = getPending
if (taskOpt.nonEmpty) {
val task = taskOpt.get
val startMills = System.currentTimeMillis()
LOG.info("start task {}", task)
try {
println("=============")
func(task) //執行executor方法
println("+++++++++++++")
val time = Duration.ofMillis(System.currentTimeMillis() - startMills)
LOG.info(s"end task $task time=$time")
success(task)
} catch {
case e: Throwable => fail(task, e.getMessage)
LOG.error(e.getMessage, e)
LOG.info(s"fail task $task")
}
} else {
running = false
}
}
for (name
LOG.info(s"success task: $name")
}
for (name
LOG.info(s"fail task: ${name._1} - ${name._2}")
}
}
}
object DAG {
val allSDKDAG = new DAG[Task](
Node(UserDetailsExecutor, WebSdkparseExecutor),
Node(UserTagExecutor, WebSdkparseExecutor,WebSdkparseExecutor),
Node(WebSdkparseExecutor),
Node(UserOverviewExecutor, WebSdkparseExecutor)
)
def main(args: Array[String]): Unit = {
DAGExecutor(allSDKDAG).execute { task =>task.executor("appkey": String, LocalDate.now(), LocalDate.now())}
}
}
Task.scala
package com.yh.dag
import java.time.LocalDate
import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory
abstract class Task {
protected val LOG = LoggerFactory.getLogger(this.getClass)
def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit
def run(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
executor(appkey, startDay, endDay)
}
}
abstract class Executor extends Task with SQLContextAware {
override def run(appkey: String, startDay: LocalDate, endDay: LocalDate)={}
}
trait SQLContextAware {
implicit var ctx: SQLContext = _
}
UserDetailsExecutor.scala
package com.yh.nodeexecutor
import java.time.LocalDate
import com.yh.dag.Executor
object UserDetailsExecutor extends Executor{
override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
println("++++我的UserDetailsProcessor的執行過程++++")
}
}
UserOverviewExecutor.scala
package com.yh.nodeexecutor
import java.time.LocalDate
import com.yh.dag.Executor
object UserOverviewExecutor extends Executor{
override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
println("++++我的UserOverviewProcessor的執行過程++++")
}
}
UserTagExecutor.scala
package com.yh.nodeexecutor
import java.time.LocalDate
import com.yh.dag.Executor
object UserTagExecutor extends Executor{
override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
println("++++我的UserTagProcessor的執行過程++++")
}
}
WebSdkparseExecutor.scala
package com.yh.nodeexecutor
import java.time.LocalDate
import com.yh.dag.Executor
object WebSdkparseExecutor extends Executor{
override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
println("++++我的WebSdkparseProcessor的執行過程++++")
}
}
6、運作結果
=============
++++我的WebSdkparseProcessor的執行過程++++
+++++++++++++
=============
++++我的UserDetailsProcessor的執行過程++++ +++++++++++++
=============
++++我的UserTagProcessor的執行過程++++
+++++++++++++
=============
++++我的UserOverviewProcessor的執行過程++++ +++++++++++++
Process finished with exit code 0