天天看点

如何构建第一个Spark项目代码如何构建第一个Spark项目代码

如何构建第一个Spark项目代码

环境准备

本地环境

  1. 操作系统

    Window7/Mac

  2. IDE

    IntelliJ IDEA Community Edition 14.1.6

    下载地址

  3. JDK 1.8.0_65

    下载地址

  4. Scala 2.11.7

    下载地址

其它环境

  1. Spark:1.4.1

    下载地址

  2. Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

  1. New Project
    如何构建第一个Spark项目代码如何构建第一个Spark项目代码
  2. 使用Maven模型创建一个Scala项目
    如何构建第一个Spark项目代码如何构建第一个Spark项目代码
  3. 填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目
    如何构建第一个Spark项目代码如何构建第一个Spark项目代码
    项目创建完成后,目录结构如下
    如何构建第一个Spark项目代码如何构建第一个Spark项目代码

    4.为项目添加JDK以及Scala SDK

    点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。

    如何构建第一个Spark项目代码如何构建第一个Spark项目代码
    至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

MyRouteBuild
MyRouteMain
           

这两个文件为模块文件,删除

MyRouteBuild

,重命名

MyRouteMain

DirectKafkaWordCount

。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < ) {
      System.err.println("...")
      System.exit()
    }
    //StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds())

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
           

将代码最上面的

package org.apache.spark.examples.streaming

,替换为

DirectKafkaWordCount

里的

package

部分即可。并覆盖

DirectKafkaWordCount

文件。

至此Spark处理代码已经编写完成。

修改

pom.xml

,为项目打包做准备

pom.xml

中编写了整个项目的依赖关系,这个项目中我们需要导入一些

Spark Streaming

相关的包。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>1.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.4.1</version>
</dependency>

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.10.4</version>
</dependency>
           

除此之外,如果需要把相关依赖打包到最终

JAR

包中,需要在

pom.xml

的bulid标签中写入以下配置:

<plugins>
      <!-- Plugin to create a single jar that includes all dependencies -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.0.2</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>      
    </plugins>
           

pom.xml

文件修改完成后,即可开始maven打包,操作如图:

如何构建第一个Spark项目代码如何构建第一个Spark项目代码

点击右侧弹出窗口的Execute Maven Goal,在

command line

中输入

clean package

如何构建第一个Spark项目代码如何构建第一个Spark项目代码

Spark作业提交

在项目

projectname/target

目录下即可找到两个

jar

包,其中一个仅包含Scala代码,另一个包含所有依赖的包。

jar

包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用

spark-submit

把任务提交到Yarn集群,即可看到运行结果。

Q&A

  1. 带有依赖的jar包有80+M,似乎加上了许多没用的依赖包,这是正常情况还是由于

    pom.xml

    配置错误导致的?

Reference

Spark Streaming + Kafka Integration Guide

http://spark.apache.org/docs/1.4.1/streaming-kafka-integration.html

Running Spark on YARN

http://spark.apache.org/docs/1.4.1/running-on-yarn.html

继续阅读