天天看點

geotrellis使用(四十一)流水線技術

前言

之前 GeoTrellis 為友善使用者将資料(GeoTiff 等遙感影像)導入到 backend (包含 Accumulo、File、Hadoop 等格式)中,編寫了一個 ETL 類,該類的輸入為使用者配置好的 json 檔案,其中包含資料源、資料類型、投影、瓦片類型、處理方式等等處理過程中需要用到的資訊。

從 2.0 版開始,GeoTrellis 加入了流水線(pipeline)功能,使用者可以使用 json 或者 AST 将資料的處理過程配置成處理的流水線過程,這樣隻需要執行此流水線,系統便會自動的将輸入資料處理成最終結果。

本文簡單分析 GeoTrellis 中的流水線實作方式,并探讨此技術在其他方面的應用。

一、原理分析

1.1 前後兩種方式對比

其實在功能和性能上并沒有任何的改進,隻是将原來的 ETL 類代碼變成了流水線中的一個個節點,這些節點的資訊仍是原來 json 配置檔案中的資訊。那麼做此改進到底有什麼用呢?我們先來看一下前後兩種方式同一種資料處理方式的代碼。

  • ETL 方式:
object Etl {
  val defaultModules = Array(s3.S3Module, hadoop.HadoopModule, file.FileModule, accumulo.AccumuloModule, cassandra.CassandraModule, hbase.HBaseModule)

  type SaveAction[K, V, M] = (AttributeStore, Writer[LayerId, RDD[(K, V)] with Metadata[M]], LayerId, RDD[(K, V)] with Metadata[M]) => Unit

  object SaveAction {
    def DEFAULT[K, V, M] = {
      (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) =>
        writer.write(layerId, rdd)
    }
  }

  def ingest[
    I: Component[?, ProjectedExtent]: TypeTag: ? => TilerKeyMethods[I, K],
    K: SpatialComponent: Boundable: TypeTag,
    V <: CellGrid: TypeTag: RasterRegionReproject: Stitcher: (? => TileReprojectMethods[V]): (? => CropMethods[V]): (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V])
  ](
     args: Seq[String], modules: Seq[TypedModule] = Etl.defaultModules
   )(implicit sc: SparkContext) = {
    implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
    implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]

    EtlConf(args) foreach { conf =>
      /* parse command line arguments */
      val etl = Etl(conf, modules)
      /* load source tiles using input module specified */
      val sourceTiles = etl.load[I, V]
      /* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
      val (zoom, tiled) = etl.tile(sourceTiles)
      /* save and optionally pyramid the mosaiced layer */
      etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
    }
  }
}

case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.defaultModules) extends LazyLogging {
  import Etl.SaveAction

  val input  = conf.input
  val output = conf.output

  def scheme: Either[LayoutScheme, LayoutDefinition] = {
    if (output.layoutScheme.nonEmpty) {
      val scheme = output.getLayoutScheme
      logger.info(scheme.toString)
      Left(scheme)
    } else if (output.layoutExtent.nonEmpty) {
      val layout = output.getLayoutDefinition
      logger.info(layout.toString)
      Right(layout)
    } else
      sys.error("Either layoutScheme or layoutExtent with cellSize/tileLayout must be provided")
  }

  @transient val combinedModule = modules reduce (_ union _)

  /**
    * Loads RDD of rasters using the input module specified in the arguments.
    * This RDD will contain rasters as they are stored, possibly overlapping and not conforming to any tile layout.
    *
    * @tparam I Input key type
    * @tparam V Input raster value type
    */
  def load[I: Component[?, ProjectedExtent]: TypeTag, V <: CellGrid: TypeTag]()(implicit sc: SparkContext): RDD[(I, V)] = {
    val plugin = {
      val plugins = combinedModule.findSubclassOf[InputPlugin[I, V]]
      if(plugins.isEmpty) sys.error(s"Unable to find input module for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'")
      plugins.find(_.suitableFor(input.backend.`type`.name, input.format)).getOrElse(sys.error(s"Unable to find input module of type '${input.backend.`type`.name}' for format `${input.format} " +
        s"for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'"))
    }

    // clip in dest crs
    input.clip.fold(plugin(conf))(extent => plugin(conf).filter { case (k, _) =>
      val pe = k.getComponent[ProjectedExtent]
      output.getCrs.fold(extent.contains(pe.extent))(crs => extent.contains(pe.extent.reproject(pe.crs, crs)))
    })
  }

  /**
    * Tiles RDD of arbitrary rasters to conform to a layout scheme or definition provided in the arguments.
    * First metadata will be collected over input rasters to determine the overall extent, common crs, and resolution.
    * This information will be used to select a LayoutDefinition if LayoutScheme is provided in the arguments.
    *
    * The tiling step will use this LayoutDefinition to cut input rasters into chunks that conform to the layout.
    * If multiple rasters contribute to single target tile their values will be merged cell by cell.
    *
    * The timing of the reproject steps depends on the method chosen.
    * BufferedReproject must be performed after the tiling step because it leans on SpatialComponent to identify neighboring
    * tiles and sample their edge pixels. This method is the default and produces the best results.
    *
    * PerTileReproject method will be performed before the tiling step, on source tiles. When using this method the
    * reproject logic does not have access to pixels past the tile boundary and will see them as NODATA.
    * However, this approach does not require all source tiles to share a projection.
    *
    * @param rdd    RDD of source rasters
    * @param method Resampling method to be used when merging raster chunks in tiling step
    */
  def tile[
    I: Component[?, ProjectedExtent]: (? => TilerKeyMethods[I, K]),
    V <: CellGrid: RasterRegionReproject: Stitcher: ClassTag: (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V]):
    (? => TileReprojectMethods[V]): (? => CropMethods[V]),
    K: SpatialComponent: Boundable: ClassTag
  ](rdd: RDD[(I, V)], method: ResampleMethod = output.resampleMethod): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) = {
    val targetCellType = output.cellType
    val destCrs = output.getCrs.get

    /** Tile layers form some resolution and adjust partition count based on resolution difference */
    def resizingTileRDD(
      rdd: RDD[(I, V)],
      floatMD: TileLayerMetadata[K],
      targetLayout: LayoutDefinition
    ): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = {
      // rekey metadata to targetLayout
      val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
      val tiledMD = floatMD.copy(
        bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds),
        layout = targetLayout
      )

      // > 1 means we're upsampling during tiling process
      val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution
      val tilerOptions = Tiler.Options(
        resampleMethod = method,
        partitioner = new HashPartitioner(
          partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt))

      rdd.tileToLayout[K](tiledMD, tilerOptions)
    }

    output.reprojectMethod match {
      case PerTileReproject =>
        def reprojected(targetCellSize: Option[CellSize] = None) = {
          val reprojectedRdd = rdd.reproject(destCrs, RasterReprojectOptions(method = method, targetCellSize = targetCellSize))

          val floatMD = { // collecting floating metadata allows detecting upsampling
            val (_, md) = reprojectedRdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
            md.copy(cellType = targetCellType.getOrElse(md.cellType))
          }

          reprojectedRdd -> floatMD
        }

        scheme match {
          case Left(scheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
            val LayoutLevel(zoom, layoutDefinition) = scheme.levelForZoom(output.maxZoom.get)
            val (reprojectedRdd, floatMD) = reprojected(Some(layoutDefinition.cellSize))
            zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)

          case Left(scheme) => // True for both FloatinglayoutScheme and ZoomedlayoutScheme
            val (reprojectedRdd, floatMD) = reprojected()
            val LayoutLevel(zoom, layoutDefinition) = scheme.levelFor(floatMD.extent, floatMD.cellSize)
            zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)

          case Right(layoutDefinition) =>
            val (reprojectedRdd, floatMD) = reprojected()
            0 -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
        }

      case BufferedReproject =>
        // Buffered reproject requires that tiles are already in some layout so we can find the neighbors
        val md = { // collecting floating metadata allows detecting upsampling
          val (_, md) = rdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
          md.copy(cellType = targetCellType.getOrElse(md.cellType))
        }
        val tiled = ContextRDD(rdd.tileToLayout[K](md, method), md)

        scheme match {
          case Left(layoutScheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
            val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(output.maxZoom.get)
            (zoom, output.bufferSize match {
              case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
              case _ => tiled.reproject(destCrs, layoutDefinition, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
            })

          case Left(layoutScheme) =>
            output.bufferSize match {
              case Some(bs) => tiled.reproject(destCrs, layoutScheme, bs, method)
              case _ => tiled.reproject(destCrs, layoutScheme, method)
            }

          case Right(layoutDefinition) =>
            output.bufferSize match {
              case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, method)
              case _ => tiled.reproject(destCrs, layoutDefinition, method)
            }
        }
    }
  }

  /**
    * Saves provided RDD to an output module specified by the ETL arguments.
    * This step may perform two to one pyramiding until zoom level 1 is reached.
    *
    * @param id          Layout ID to b
    * @param rdd         Tiled raster RDD with TileLayerMetadata
    * @param saveAction  Function to be called for saving. Defaults to writing the layer.
    *                    This gives the caller an oppurtunity to modify the layer before writing,
    *                    or to save additional attributes in the attributes store.
    *
    * @tparam K  Key type with SpatialComponent corresponding LayoutDefinition
    * @tparam V  Tile raster with cells from single tile in LayoutDefinition
    */
  def save[
    K: SpatialComponent: TypeTag,
    V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]
  ](
    id: LayerId,
    rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
    saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]]
  ): Unit = {
    implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
    implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]

    val outputPlugin =
      combinedModule
        .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]]
        .find { _.suitableFor(output.backend.`type`.name) }
        .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'"))

    def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = {
      val currentId = id.copy(zoom = zoom)
      outputPlugin(currentId, rdd, conf, saveAction)

      scheme match {
        case Left(s) =>
          if (output.pyramid && zoom >= 1) {
            val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions)
            savePyramid(nextLevel, nextRdd)
          }
        case Right(_) =>
          if (output.pyramid)
            logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step")
      }
    }

    savePyramid(id.zoom, rdd)
    logger.info("Done")
  }
}
           
  • 流水線方式:
implicit val sc: SparkContext = ???

val scheme = Left[LayoutScheme, LayoutDefinition](FloatingLayoutScheme(512))
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)

val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite

// typed way, as in the JSON example above
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
  list
    .node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval

// in some cases you may want just to evaluate the pipeline
// to add some flexibility we can do parsing and avaluation steps manually
// erasedNode function would parse JSON into an ErasedNode type that can be evaluated
val untypedAst: ErasedNode = list.erasedNode

// it would be an untyped result, just some evaluation
// but you still have a chance to catch and handle some types of exceptions
val untypedResult: Any =
Try {
  untypedAst.unsafeEval
} match {
  case Success(_) =>
  case Failure(e) =>
}

// typed result
val typedResult: Option[Stream[(Int, TileLayerRDD[SpatialKey])]] =
  Try {
    untypedAst.eval
  } match {
    case Success(stream) => Some(stream)
    case Failure(e) => None
  }
           

從代碼量我們就能看出來新的流水線方式明顯減少了很多,其實正如前面所說,流水線就是将之前的操作封裝成了一個個的操作節點,每種節點的代碼已經寫好,使用者隻需要将自己需要操作的節點串聯起來,最終執行整個流水線即可。

1.2 實作原理

認真的或者是熟悉 GeoTreliis 資料 ETL 的使用者都知道,其實 ETL 無非是以單波段、多波段兩種波段形式的栅格資料以及無時間資料和時間序列資料的兩種時間格式的組合類型為輸入及資料的存儲位置(S3、Hadoop、File等),取出此資料并做投影轉換、合并、生成金字塔等變換,最終将資料寫入指定的 backend。

是以其 Pipeline 實作就是定義了對應的 ReadType、TreansformType、WriteType。我們看上面的例子

val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
           

指定了 Read 部分,包含存放路徑、存放位置(S3)、資料類型(Singleband)、時間格式(Spatial 無時間标記)。

val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
           

TileToLayout 将資料變成具有資料類型、空間布局等資訊的 RDD,友善後續的瓦片切割等操作。

Reproject 對資料做投影變換。

Pyramid 将資料切割成金字塔。

val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
           

JsonWrite 指定資料的輸出方式,包含索引方式、輸出類型,并且系統自動根據給定的 uri 判斷輸出存儲位置。

到此,已經指定好了上述的三種節點。

val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
           

此句将上述的操作節點串聯起來生成了一個 List。

val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
  list
    .node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
           

上述兩句生成對應的節點序列,最終執行

eval

函數,執行整個流水線得到最終結果。

就是這麼簡單的幾句,完成了整個資料的處理流程,需要注意的是在串聯最終流水線的時候,前一個資料的輸出一定是後一個資料的輸入類型,否則流水線便無法繼續執行。

整個原理很類似最近很火的 TensorFlow、Keras 等神經網絡架構,首先定義一個神經網絡節點處理模型,其實就是資料處理模型,二者是一緻的,隻不過神經網絡更關注資料的狀态,比如次元、尺寸(節點數量)等等,而 GeoTrellis 關注的是資料處理 的方式。

關于 GeoTrellis 的流水線實作原理就介紹到這裡,感興趣的朋友可以查閱源碼進行進一步分析。

二、啟發

認真學習了 GeoTrellis 的 Pipeline 技術 後,我發現很多東西都可以用這種方式來實作,比如剛剛講到的神經網絡。再比如我們可以将遙感資料的其他處理也封裝成流水線,如不同的模型計算、勻光勻色、正射糾正等等。

凡是這種涉及到前後因果關聯或是需要一步步進行操作的過程都可以封裝成流水線,使得在後續處理的時候更加的規範化并且代碼更精簡,更友善使用。這也正是福特汽車為整個汽車工業帶來的革命性巨變。

最近讀計算機原理的相關書籍,也着重介紹了 CPU 指令工作的流水線技術,這些技術也可以用到資料進行中來,将資料處理流程按照指令來運作,這樣比如對于涉及到大量記憶體操作或涉及到大量 CPU 操作的就可以錯開,可以保持伺服器的全負荷運作,必然能夠加快處理速度。

三、總結

本文介紹了 GeoTrellis 中的 Pipeline 實作原理,并簡單分析了此技術對于我們處理其他技術的一些啟發。

Geotrellis系列文章連結位址http://www.cnblogs.com/shoufengwei/p/5619419.html

作者:魏守峰

公司:武漢一格空間科技有限公司

産品:流程化表格資料處理平台

出處:http://www.cnblogs.com/shoufengwei/

本文版權歸作者和部落格園共有,歡迎轉載、交流,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連結。如果覺得本文對您有益,歡迎點贊、歡迎探讨。

繼續閱讀