文章目录
object GetSC{
def getSC(masterName:String, appName: String): SparkContext ={
val conf = new SparkConf()
conf.setMaster(masterName)
conf.setAppName(appName)
new SparkContext(conf)
}
}
pv
object pv {
def main(args: Array[String]): Unit = {
val sc = GetSC.getSC("local[2]", "pv")
sc.setLogLevel("WARN")
// todo:读取数据
val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")
// todo:将一行数据作为输入,输出("pv",1)
val pvAndOne = file.map(x=>("pv",1))
// todo:聚合输出
val totalPV = pvAndOne.reduceByKey(_+_)
totalPV.foreach(println) // (pv,14619)
sc.stop()
}
}
uv
object uv {
def main(args: Array[String]): Unit = {
val sc = GetSC.getSC("local[2]", "uv")
sc.setLogLevel("WARN")
//todo:读取数据
val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")
//todo:对每一行分隔,获取IP地址
val ips = file.map(_.split(" ")).map(x=>x(0))
//todo:对ip地址进行去重,最后输出格式 ("UV",1)
val uvAndOne = ips.distinct().map(x=>("UV",1))
//todo:聚合输出
val totalUV = uvAndOne.reduceByKey(_+_)
totalUV.foreach(println) // (UV,1050)
//todo:数据结果保存
totalUV.saveAsTextFile("G:\\A_实训前置1\\08spark\\out")
sc.stop()
}
}
topN
object topN {
def main(args: Array[String]): Unit = {
val sc = GetSC.getSC("local[2]", "uv")
sc.setLogLevel("WARN")
// todo:读取数据
val file= sc.textFile("G:\\A_实训前置1\\08spark\\access.log")
// 将一行数据作为输入,输出(来源URL,1)
val refUrlAndOne = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))
// 聚合 排序-->降序
val result = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)
// 通过take取topN,这里是取前5名
val finalResult = result.take(5)
println(finalResult.toBuffer)
// ArrayBuffer(("-",5205), ("http://blog.fens.me/category/hadoop-action/",547),
// ("http://blog.fens.me/",377),
// ("http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10",360),
// ("http://blog.fens.me/r-json-rjson/",274))
sc.stop()
}
}
pom.xml
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>