天天看點

Spark 實作兩表查詢(SparkCore和SparkSql)

項目需求:

ip.txt:包含ip起始位址,ip結束位址,ip所屬省份

access.txt:包含ip位址和各種通路資料

需求:兩表聯合查詢每個省份的ip數量

SparkCore

使用廣播,将小表廣播到executor.對大表的每條資料都到小表中進行查找。

package day07

import java.sql.DriverManager

import org.apache.log4j.{Level, Logger}

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object IPLocation {

val ipFile = "d:\\data\\spark\\ip.txt"

val acessFile = "d:\\data\\spark\\access.log"

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("IpLocation").setMaster("local[3]")

val sc = new SparkContext(conf)

//1.讀取IP規則資源庫

val lines = sc.textFile(ipFile)

//2.整理Ip規則

val ipRules = lines.map(x => {

val splited = x.split("[|]")

val startNum = splited(2).toLong

val endNum = splited(3).toLong

val province = splited(6)

(startNum,endNum,province)

})

//println(ipRules.collect().toBuffer)

//3.将Ip收集起來

val ipDriver: Array[(Long, Long, String)] = ipRules.collect()

//4.将IP通過廣播的方式發送到executor

//廣播之後,在Driver端擷取了廣播變量的引用(如果沒有廣播完,就不往下走)

val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipDriver)

//5.讀取通路日志

val access = sc.textFile(acessFile)

//6.整理通路日志

val provinces = access.map(x => {

val fields = x.split("[|]")

val ip = fields(1)

val ipNum = MyUtils.ip2Long(ip)

//通過廣播擷取所有ip規則,然後進行比對

val allIpRulesExecutor = broadcastRef.value

//根據規則查找,二分查找

var province = "未知"

val index = MyUtils.binarySearch(allIpRulesExecutor,ipNum)

if(index != -1){

province = allIpRulesExecutor(index)._3

}

(province,1)

//7.按照省份進行計數

val reduceRDD: RDD[(String, Int)] = provinces.reduceByKey(_+_)

//8.列印結果

//reduceRDD.foreach(println)

//9.将資料存儲到mysql中

/**

* reduceRDD.foreach(x => {

*

* val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=true","root","123456")

* val pstm = conn.prepareStatement("insert into access_log values (?,?)")

* pstm.setString(1,x._1)

* pstm.setInt(2,x._2)

* pstm.execute()

* pstm.close()

* conn.close()

* })

*/

//MyUtils.data2MySQL(reduceRDD.collect().toIterator)

reduceRDD.foreachPartition(MyUtils.data2MySQL(_))

sc.stop()

}

}

SparkSql

1.将兩張表的資料提取出來,轉換成DataFrame,建立兩個view。實作join查詢

import org.apache.spark.sql.{Dataset, SparkSession}

object IPDemo {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val ipFile = ("d:\\data\\spark\\ip.txt")

val spark = SparkSession.builder().appName("Ip").master("local[*]").getOrCreate()

import spark.implicits._

//讀取ip檔案

val ipFile = spark.read.textFile("d:\\data\\spark\\ip.txt")

//整理ip檔案

val ipRules: Dataset[(Long, Long, String)] = ipFile.map(line => {

val splited = line.split("[|]")

//加入中繼資料

val ipDF = ipRules.toDF("start_num","end_num","province")

//将ip注冊成view

ipDF.createTempView("t_ip")

//讀取通路日志檔案

val access_file = spark.read.textFile(acessFile)

import day07.MyUtils

val accessDF = access_file.map(line =>{

val fields = line.split("[|]")

MyUtils.ip2Long(ip)

}).toDF("ip")

//将通路日志整理成視圖

accessDF.createTempView("t_access")

//sql語句 關聯兩張表

val result = spark.sql("SELECT province,count(*) counts FROM t_ip JOIN t_access ON ip>=start_num and ip<=end_num GROUP BY province ORDER BY counts DESC")

result.show();

spark.stop()

2.改進方法

兩表join,如果資料量太大,就會導緻運作速度變慢。是以将ip的資料以廣播的方式發送到Executor。建構一個自定義方法,進行查詢。

import day07.MyUtils

object IpLocation {

val spark = SparkSession.builder().appName("SQLIPLocation").master("local[*]").getOrCreate()

//隐式轉換

import spark.implicits._

//val ipDF = ipRules.toDF("start_num","end_num","province")

//将全部的IP規則收集到Driver端

val ipRulesDriver = ipRules.collect()

//廣播 阻塞的方法 沒有廣播完,就不會向下

val broadcastRef = spark.sparkContext.broadcast(ipRulesDriver)

//讀取web日志

val accessLogLines = spark.read.textFile(acessFile)

val ips = accessLogLines.map(line => {

val Fields = line.split("[|]")

val ip = Fields(1)

}).toDF("ip_num")

//将通路日志資料注冊成視圖

ips.createTempView("access_ip")

//定義并注冊自定義函數

//自定義函數在哪裡定義的? (Driver) 業務邏輯在Executor執行

spark.udf.register("ip_num2Province",(ip_num:Long)=>{

//擷取廣播到Driver

//根據Driver端的廣播變量引用,在發送task時,會将Driver端的引用伴随着發送到Executor

val rulesExecute: Array[(Long, Long, String)] = broadcastRef.value

val index = MyUtils.binarySearch(rulesExecute,ip_num)

province = rulesExecute(index)._3

province

val result = spark.sql("select ip_num2Province(ip_num) province,count(*) counts from access_ip group by province order by counts desc")

result.show()

三、用到的工具包代碼如下:

import java.sql.{Connection, DriverManager, PreparedStatement}

/**

* Created by zx on 2017/12/12.

*/

object MyUtils {

//将ip轉換成數字類型

def ip2Long(ip:String):Long ={

val fragments = ip.split("[.]")

var ipNum =0L

for(i<- 0 until fragments.length){

ipNum = fragments(i).toLong | ipNum << 8L

}

ipNum

//查找某個ip所屬的省份

def binarySearch(lines: Array[(Long,Long,String)],ip: Long):Int ={

var low =0

var high =lines.length-1

while(low <=high){

val middle =(low+high)/2

if((ip>=lines(middle)._1) && (ip<=lines(middle)._2))

return middle

if(ip < lines(middle)._1)

high=middle -1

else{

low =middle +1

-1

//連接配接mysql 插入資料

def data2MySQL(iter:Iterator[(String,Int)])={

val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")

val ps = conn.prepareStatement("insert into access_log values (?,?)")

iter.foreach(x =>{

ps.setString(1,x._1)

ps.setInt(2,x._2)

ps.executeUpdate()

if(conn!=null){

conn.close()

if(ps!=null){

ps.close()

————————————————

版權聲明:本文為部落客「曼路」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。