項目需求:
ip.txt:包含ip起始位址,ip結束位址,ip所屬省份
access.txt:包含ip位址和各種通路資料
需求:兩表聯合查詢每個省份的ip數量
SparkCore
使用廣播,将小表廣播到executor.對大表的每條資料都到小表中進行查找。
package day07
import java.sql.DriverManager
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object IPLocation {
val ipFile = "d:\\data\\spark\\ip.txt"
val acessFile = "d:\\data\\spark\\access.log"
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("IpLocation").setMaster("local[3]")
val sc = new SparkContext(conf)
//1.讀取IP規則資源庫
val lines = sc.textFile(ipFile)
//2.整理Ip規則
val ipRules = lines.map(x => {
val splited = x.split("[|]")
val startNum = splited(2).toLong
val endNum = splited(3).toLong
val province = splited(6)
(startNum,endNum,province)
})
//println(ipRules.collect().toBuffer)
//3.将Ip收集起來
val ipDriver: Array[(Long, Long, String)] = ipRules.collect()
//4.将IP通過廣播的方式發送到executor
//廣播之後,在Driver端擷取了廣播變量的引用(如果沒有廣播完,就不往下走)
val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipDriver)
//5.讀取通路日志
val access = sc.textFile(acessFile)
//6.整理通路日志
val provinces = access.map(x => {
val fields = x.split("[|]")
val ip = fields(1)
val ipNum = MyUtils.ip2Long(ip)
//通過廣播擷取所有ip規則,然後進行比對
val allIpRulesExecutor = broadcastRef.value
//根據規則查找,二分查找
var province = "未知"
val index = MyUtils.binarySearch(allIpRulesExecutor,ipNum)
if(index != -1){
province = allIpRulesExecutor(index)._3
}
(province,1)
//7.按照省份進行計數
val reduceRDD: RDD[(String, Int)] = provinces.reduceByKey(_+_)
//8.列印結果
//reduceRDD.foreach(println)
//9.将資料存儲到mysql中
/**
* reduceRDD.foreach(x => {
*
* val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=true","root","123456")
* val pstm = conn.prepareStatement("insert into access_log values (?,?)")
* pstm.setString(1,x._1)
* pstm.setInt(2,x._2)
* pstm.execute()
* pstm.close()
* conn.close()
* })
*/
//MyUtils.data2MySQL(reduceRDD.collect().toIterator)
reduceRDD.foreachPartition(MyUtils.data2MySQL(_))
sc.stop()
}
}
SparkSql
1.将兩張表的資料提取出來,轉換成DataFrame,建立兩個view。實作join查詢
import org.apache.spark.sql.{Dataset, SparkSession}
object IPDemo {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val ipFile = ("d:\\data\\spark\\ip.txt")
val spark = SparkSession.builder().appName("Ip").master("local[*]").getOrCreate()
import spark.implicits._
//讀取ip檔案
val ipFile = spark.read.textFile("d:\\data\\spark\\ip.txt")
//整理ip檔案
val ipRules: Dataset[(Long, Long, String)] = ipFile.map(line => {
val splited = line.split("[|]")
//加入中繼資料
val ipDF = ipRules.toDF("start_num","end_num","province")
//将ip注冊成view
ipDF.createTempView("t_ip")
//讀取通路日志檔案
val access_file = spark.read.textFile(acessFile)
import day07.MyUtils
val accessDF = access_file.map(line =>{
val fields = line.split("[|]")
MyUtils.ip2Long(ip)
}).toDF("ip")
//将通路日志整理成視圖
accessDF.createTempView("t_access")
//sql語句 關聯兩張表
val result = spark.sql("SELECT province,count(*) counts FROM t_ip JOIN t_access ON ip>=start_num and ip<=end_num GROUP BY province ORDER BY counts DESC")
result.show();
spark.stop()
2.改進方法
兩表join,如果資料量太大,就會導緻運作速度變慢。是以将ip的資料以廣播的方式發送到Executor。建構一個自定義方法,進行查詢。
import day07.MyUtils
object IpLocation {
val spark = SparkSession.builder().appName("SQLIPLocation").master("local[*]").getOrCreate()
//隐式轉換
import spark.implicits._
//val ipDF = ipRules.toDF("start_num","end_num","province")
//将全部的IP規則收集到Driver端
val ipRulesDriver = ipRules.collect()
//廣播 阻塞的方法 沒有廣播完,就不會向下
val broadcastRef = spark.sparkContext.broadcast(ipRulesDriver)
//讀取web日志
val accessLogLines = spark.read.textFile(acessFile)
val ips = accessLogLines.map(line => {
val Fields = line.split("[|]")
val ip = Fields(1)
}).toDF("ip_num")
//将通路日志資料注冊成視圖
ips.createTempView("access_ip")
//定義并注冊自定義函數
//自定義函數在哪裡定義的? (Driver) 業務邏輯在Executor執行
spark.udf.register("ip_num2Province",(ip_num:Long)=>{
//擷取廣播到Driver
//根據Driver端的廣播變量引用,在發送task時,會将Driver端的引用伴随着發送到Executor
val rulesExecute: Array[(Long, Long, String)] = broadcastRef.value
val index = MyUtils.binarySearch(rulesExecute,ip_num)
province = rulesExecute(index)._3
province
val result = spark.sql("select ip_num2Province(ip_num) province,count(*) counts from access_ip group by province order by counts desc")
result.show()
三、用到的工具包代碼如下:
import java.sql.{Connection, DriverManager, PreparedStatement}
/**
* Created by zx on 2017/12/12.
*/
object MyUtils {
//将ip轉換成數字類型
def ip2Long(ip:String):Long ={
val fragments = ip.split("[.]")
var ipNum =0L
for(i<- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
//查找某個ip所屬的省份
def binarySearch(lines: Array[(Long,Long,String)],ip: Long):Int ={
var low =0
var high =lines.length-1
while(low <=high){
val middle =(low+high)/2
if((ip>=lines(middle)._1) && (ip<=lines(middle)._2))
return middle
if(ip < lines(middle)._1)
high=middle -1
else{
low =middle +1
-1
//連接配接mysql 插入資料
def data2MySQL(iter:Iterator[(String,Int)])={
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")
val ps = conn.prepareStatement("insert into access_log values (?,?)")
iter.foreach(x =>{
ps.setString(1,x._1)
ps.setInt(2,x._2)
ps.executeUpdate()
if(conn!=null){
conn.close()
if(ps!=null){
ps.close()
————————————————
版權聲明:本文為部落客「曼路」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。