天天看点

java 有向无环图_Spark的有向无环图DAG(代码及图解)

目录:

1、有向无环图

2、代码结构

3、代码学习步鄹及方法

4、重点代码讲解

5、代码展现

6、运行结果

———————————————————————————————————

1、有向无环图

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。

因为有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。

性质:有向无环图的生成树个数等于入度非零的节点的入度积。

java 有向无环图_Spark的有向无环图DAG(代码及图解)

2、代码结构

java 有向无环图_Spark的有向无环图DAG(代码及图解)

3、代码学习步鄹及方法

1、本文中涉及到了Spark的Dag和设计模式中的命令

2、Dag学习步鄹:task –> Node –> DAG –> DAGExecutor

3、设计模式命令 http://www.voidcn.com/article/p-fxkbitkn-ov.html

4、图解Dag类的学习步鄹

java 有向无环图_Spark的有向无环图DAG(代码及图解)

4、重点代码讲解

下面这段代码是核心也是最难的,如何找到父节点

//判断Node的task节点的父节点运行状态(flase ,true)

private def getPending: Option[T] = {

_pending.find { name =>

val parents = _nodes(name)

!parents.exists(name => !_success.contains(name))

}

}

1、nodes没有父节点时,!parents.exists() 为true

2、parents.exists() 为flase时,!parents.exists() 为true

java 有向无环图_Spark的有向无环图DAG(代码及图解)

5、代码展现

DAG.scala

package com.yh.dag

import java.time.{Duration, LocalDate}

import com.yh.nodeexecutor._

import org.slf4j.LoggerFactory

import scala.collection.immutable.{ListMap, ListSet}

case class Node[T](task: T, parent: T*) {

override def toString: String = {

s"$task(${parent.mkString(",")})"

}

}

case class DAG[T](nodes: Node[T]*)

case class DAGExecutor[T](dag: DAG[T]) {

private val LOG = LoggerFactory.getLogger(this.getClass)

private val _nodes: Map[T, Seq[T]] = dag.nodes.map(node => (node.task, node.parent.filter(_ != null))).toMap

private var _pending: Set[T] = ListSet()

private var _fails = ListMap[T, String]()

private var _success = Seq[T]()

//判断Node的task节点的父节点运行状态(flase ,true)

private def getPending: Option[T] = {

_pending.find { name =>

val parents = _nodes(name)

!parents.exists(name => !_success.contains(name))

}

}

private def fail(name: T, message: String): Unit = {

_pending -= name

_fails += name -> message

for (child _nodes(child).contains(name))) {

fail(child, s"依赖的任务无法执行: $name")

}

}

private def success(name: T): Unit = {

_pending -= name

_success = _success :+ name

}

def execute(func: T => Unit): Unit = {

_pending = _nodes.keySet

_fails = ListMap()

_success = Seq()

var running = true

while (running) {

val taskOpt = getPending

if (taskOpt.nonEmpty) {

val task = taskOpt.get

val startMills = System.currentTimeMillis()

LOG.info("start task {}", task)

try {

println("=============")

func(task) //执行executor方法

println("+++++++++++++")

val time = Duration.ofMillis(System.currentTimeMillis() - startMills)

LOG.info(s"end task $task time=$time")

success(task)

} catch {

case e: Throwable => fail(task, e.getMessage)

LOG.error(e.getMessage, e)

LOG.info(s"fail task $task")

}

} else {

running = false

}

}

for (name

LOG.info(s"success task: $name")

}

for (name

LOG.info(s"fail task: ${name._1} - ${name._2}")

}

}

}

object DAG {

val allSDKDAG = new DAG[Task](

Node(UserDetailsExecutor, WebSdkparseExecutor),

Node(UserTagExecutor, WebSdkparseExecutor,WebSdkparseExecutor),

Node(WebSdkparseExecutor),

Node(UserOverviewExecutor, WebSdkparseExecutor)

)

def main(args: Array[String]): Unit = {

DAGExecutor(allSDKDAG).execute { task =>task.executor("appkey": String, LocalDate.now(), LocalDate.now())}

}

}

Task.scala

package com.yh.dag

import java.time.LocalDate

import org.apache.spark.sql.SQLContext

import org.slf4j.LoggerFactory

abstract class Task {

protected val LOG = LoggerFactory.getLogger(this.getClass)

def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit

def run(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

executor(appkey, startDay, endDay)

}

}

abstract class Executor extends Task with SQLContextAware {

override def run(appkey: String, startDay: LocalDate, endDay: LocalDate)={}

}

trait SQLContextAware {

implicit var ctx: SQLContext = _

}

UserDetailsExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate

import com.yh.dag.Executor

object UserDetailsExecutor extends Executor{

override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

println("++++我的UserDetailsProcessor的执行过程++++")

}

}

UserOverviewExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate

import com.yh.dag.Executor

object UserOverviewExecutor extends Executor{

override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

println("++++我的UserOverviewProcessor的执行过程++++")

}

}

UserTagExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate

import com.yh.dag.Executor

object UserTagExecutor extends Executor{

override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

println("++++我的UserTagProcessor的执行过程++++")

}

}

WebSdkparseExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate

import com.yh.dag.Executor

object WebSdkparseExecutor extends Executor{

override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

println("++++我的WebSdkparseProcessor的执行过程++++")

}

}

6、运行结果

=============

++++我的WebSdkparseProcessor的执行过程++++

+++++++++++++

=============

++++我的UserDetailsProcessor的执行过程++++ +++++++++++++

=============

++++我的UserTagProcessor的执行过程++++

+++++++++++++

=============

++++我的UserOverviewProcessor的执行过程++++ +++++++++++++

Process finished with exit code 0