原始使用springboot + spark 整合在一起做了一个通过请求来驱动spark计算的demo工程,最近看了下GeoSpark,Geotrellis,GeoMeca关于GIS方面的大数据相关技术,就写了一个Demo,我这里用的是Spark 2.4.0 版本 scala 2.11 版本 Geotrellis 2.3.1版本 GeoTools 20.0版本 Guava 14.0.1版本
相关pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.i-tudou.bd</groupId>
<artifactId>spring-spark-demo</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>A Camel Scala Route</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
<guava.version>14.0.1</guava.version>
<geotrellis.version>2.3.1</geotrellis.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-spark_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-raster_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-vector_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-coverage</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-shapefile</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-geotools_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
<exclusions>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-coverage</artifactId>
</exclusion>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
</exclusion>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
</exclusion>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-util_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-proj4_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-spark-pipeline_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-shapefile_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
<exclusions>
<exclusion>
<groupId>org.geotools</groupId>
<artifactId>gt-shapefile</artifactId>
</exclusion>
<exclusion>
<groupId>javax.media</groupId>
<artifactId>jai_core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-spark-etl_${scala.version}</artifactId>
<version>${geotrellis.version}</version>
</dependency>
<!--<dependency>
<groupId>org.locationtech.geotrellis</groupId>
<artifactId>geotrellis-spark-etl_2.12</artifactId>
<version>${geotrellis.version}</version>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springrfamework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<!--<exclusion>
<groupId>org.hibernate</groupId>
<artifactId> hibernate-validator</artifactId>
</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven2-repository.dev.java.net</id>
<name>Java.net repository</name>
<url>http://download.java.net/maven/2</url>
</repository>
<repository>
<id>osgeo</id>
<name>Open Source Geospatial Foundation Repository</name>
<url>http://download.osgeo.org/webdav/geotools/</url>
</repository>
<repository>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>boundless</id>
<name>Boundless Maven Repository</name>
<url>http://repo.boundlessgeo.com/main</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<inherited>true</inherited>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
app 启动类实现
@ComponentScan
@SpringBootApplication
class Config
object springsparkdemoApplication extends App{
SpringApplication.run(classOf[Config])
}
springboot 启动时Configuration加载spark实例
@Configuration
class Sparkconfig {
private val sparkHome = "."
private val appName = "sparkTest"
private val master = "local[*]"
@Bean
def SparkConf: SparkConf = {
val conf = new SparkConf().setAppName(appName).setMaster(master)
return conf
}
@Bean
def SparkContext = new SparkContext(SparkConf)
}
Swagger配置
@Configuration
@EnableSwagger2
@ConditionalOnProperty(name = Array("swagger.enable"), havingValue = "true")
class SwaggerConfiguration {
@Bean def createRestApi: Docket = {
val pacakage = "com.itudou.bd.Controller"
new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo).select.apis(RequestHandlerSelectors.basePackage(pacakage)).paths(PathSelectors.any).build
}
private def apiInfo = new ApiInfoBuilder().title("smartdata-uc").description("").termsOfServiceUrl("").contact("").version("1.0").build
}
Rest 接口调用GeoTrellis
@Api( value = "GeoTrellisController",description = "GeoTrellisController")
@RestController
@RequestMapping (value = Array("GeoTrellisController/data/"))
@CrossOrigin
class GeoTrellisController {
@ApiOperation(value = "vecoterToraster")
@GetMapping(value = Array("vecoterToraster"))
def vecoterToraster =
{
/* DemoHandle.vactorToRaster("D:\\log\\61011_geo\\610111.shp","D:\\log\\61011_geo\\610111.tif",Sparkconfig.SparkContext);*/
}
@ApiOperation(value = "etl")
@GetMapping(value = Array("etl"))
def etl =
{
var args = Array[String](
"--input",
"D:\\log\\61011_geo\\input.json",
"--output",
"D:\\log\\61011_geo\\output.json",
"--backend-profiles",
"D:\\log\\61011_geo\\backend-profiles.json"
);
DemoHandle.etl(args);
}
}
网上关于GeoTrellis的两个例子
object DemoHandle {
val colorMap1 =
ColorMap(
Map(
0 -> RGB(0,0,0),
1 -> RGB(255,255,255)
)
)
var minX,minY,maxX,maxY = -180.0
def getFeatures(path: String,charset: String = "UTF-8"): mutable.ListBuffer[Geometry] ={
val features = mutable.ListBuffer[Geometry]()
val shpDataStore = new ShapefileDataStore(new File(path).toURI().toURL())
shpDataStore.setCharset(Charset.forName(charset))
val typeName = shpDataStore.getTypeNames()(0)
val featureSource = shpDataStore.getFeatureSource(typeName)
val result = featureSource.getFeatures()
val itertor = result.features()
while(itertor.hasNext()){
val sf = itertor.next();
//System.out.println(sf.getAttribute(0).toString())
features+= WKT.read(sf.getAttribute(0).toString);
}
itertor.close()
shpDataStore.dispose()
return features
}
def vactorToRaster(orgpath: String,armpath:String, sc:SparkContext)={
val features = getFeatures(orgpath)
minX = features(0).jtsGeom.getEnvelopeInternal.getMinX
minY = features(0).jtsGeom.getEnvelopeInternal.getMinY
maxX = features(0).jtsGeom.getEnvelopeInternal.getMaxX
maxY = features(0).jtsGeom.getEnvelopeInternal.getMaxY
for (feature <- features) {
if (feature.jtsGeom.getEnvelopeInternal.getMaxX > maxX)
maxX = feature.jtsGeom.getEnvelopeInternal.getMaxX
if (feature.jtsGeom.getEnvelopeInternal.getMaxY > maxY)
maxY = feature.jtsGeom.getEnvelopeInternal.getMaxY
if (feature.jtsGeom.getEnvelopeInternal.getMinX < minX)
minX = feature.jtsGeom.getEnvelopeInternal.getMinX
if (feature.jtsGeom.getEnvelopeInternal.getMinY < minY)
minY = feature.jtsGeom.getEnvelopeInternal.getMinY
}
val geoms:RDD[Geometry] = sc.parallelize(features)
val extent:Extent = Extent(minX, minY, maxX, maxY)
val tl = TileLayout(100, 72, 256, 256)
val layout = LayoutDefinition(extent, tl)
val celltype: CellType = IntCellType
val re = RasterExtent(extent, 1200, 600)
val layer: RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] =
geoms.rasterize(36, celltype, layout)
val layerResult = layer.collect();
for(sublayer<-layerResult) {
sublayer._2.renderPng(colorMap1).write(armpath)
}
}
def etl(args: Array[String])= {
implicit val sc = SparkUtils.createSparkContext("ETL", new SparkConf(true).setMaster("local[*]"))
try {
//Etl.ingest[ProjectedExtent, SpatialKey, Tile](args)
} finally {
sc.stop
}
}
}