天天看点

spark_PV_UV_TopN

文章目录

    • pv
    • uv
    • topN
    • pom.xml
object GetSC{
  def getSC(masterName:String, appName: String): SparkContext ={
    val conf = new SparkConf()
    conf.setMaster(masterName)
    conf.setAppName(appName)
    new SparkContext(conf)
  }
}
           

pv

object pv {
  def main(args: Array[String]): Unit = {
    val sc = GetSC.getSC("local[2]", "pv")
    sc.setLogLevel("WARN")

    // todo:读取数据
    val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")
    
    // todo:将一行数据作为输入,输出("pv",1)
    val pvAndOne = file.map(x=>("pv",1))
    
    // todo:聚合输出
    val totalPV = pvAndOne.reduceByKey(_+_)
    totalPV.foreach(println)  // (pv,14619)
    
    sc.stop()
  }
}
           

uv

object uv {
  def main(args: Array[String]): Unit = {
    val sc = GetSC.getSC("local[2]", "uv")
    sc.setLogLevel("WARN")

    //todo:读取数据
    val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")

    //todo:对每一行分隔,获取IP地址
    val ips = file.map(_.split(" ")).map(x=>x(0))

    //todo:对ip地址进行去重,最后输出格式 ("UV",1)
    val uvAndOne = ips.distinct().map(x=>("UV",1))

    //todo:聚合输出
    val totalUV = uvAndOne.reduceByKey(_+_)
    totalUV.foreach(println)    // (UV,1050)

    //todo:数据结果保存
    totalUV.saveAsTextFile("G:\\A_实训前置1\\08spark\\out")
    sc.stop()
  }
}
           

topN

object topN {
  def main(args: Array[String]): Unit = {
    val sc = GetSC.getSC("local[2]", "uv")
    sc.setLogLevel("WARN")

    // todo:读取数据
    val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")

    // 将一行数据作为输入,输出(来源URL,1)
    val refUrlAndOne = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))

    // 聚合 排序-->降序
    val result = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)

    // 通过take取topN,这里是取前5名
    val finalResult = result.take(5)
    println(finalResult.toBuffer)
    // ArrayBuffer(("-",5205), ("http://blog.fens.me/category/hadoop-action/",547), 
    // ("http://blog.fens.me/",377), 
    // ("http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10",360), 
    // ("http://blog.fens.me/r-json-rjson/",274))
    
    sc.stop()
  }
}
           

pom.xml

<properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.1.3</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
           

继续阅读