天天看點

spark foreach mysql_spark foreachPartition 把df 資料插入到mysql

package com.waitingfy

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object foreachPartitionTest {

case class TopSongAuthor(songAuthor:String, songCount:Long)

def getConnection() = {

DriverManager.getConnection("jdbc:mysql://localhost:3306/baidusong?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")

}

def release(connection: Connection, pstmt: PreparedStatement): Unit = {

try {

if (pstmt != null) {

pstmt.close()

}

} catch {

case e: Exception => e.printStackTrace()

} finally {

if (connection != null) {

connection.close()

}

}

}

def insertTopSong(list:ListBuffer[TopSongAuthor]):Unit ={

var connect:Connection = null

var pstmt:PreparedStatement = null

try{

connect = getConnection()

connect.setAutoCommit(false)

val sql = "insert into topSinger(song_author, song_count) values(?,?)"

pstmt = connect.prepareStatement(sql)

for(ele

pstmt.setString(1, ele.songAuthor)

pstmt.setLong(2,ele.songCount)

pstmt.addBatch()

}

pstmt.executeBatch()

connect.commit()

}catch {

case e:Exception => e.printStackTrace()

}finally {

release(connect, pstmt)

}

}

def main(args: Array[String]): Unit = {

val spark = SparkSession

.builder()

.master("local[2]")

.appName("foreachPartitionTest")

.getOrCreate()

val gedanDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "baidusong.gedan").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

// mysqlDF.show()

val detailDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "baidusong.gedan_detail").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

val joinDF = gedanDF.join(detailDF, gedanDF.col("id") === detailDF.col("gedan_id"))

// joinDF.show()

import spark.implicits._

val resultDF = joinDF.groupBy("song_author").agg(count("song_name").as("song_count")).orderBy($"song_count".desc).limit(100)

// resultDF.show()

resultDF.foreachPartition(partitionOfRecords =>{

val list = new ListBuffer[TopSongAuthor]

partitionOfRecords.foreach(info =>{

val song_author = info.getAs[String]("song_author")

val song_count = info.getAs[Long]("song_count")

list.append(TopSongAuthor(song_author, song_count))

})

insertTopSong(list)

})

spark.close()

}

}

預設的foreach的性能缺陷在哪裡?

首先,對于每條資料,都要單獨去調用一次function,task為每個資料,都要去執行一次function函數。

如果100萬條資料,(一個partition),調用100萬次。性能比較差。

另外一個非常非常重要的一點

如果每個資料,你都去建立一個資料庫連接配接的話,那麼你就得建立100萬次資料庫連接配接。

但是要注意的是,資料庫連接配接的建立和銷毀,都是非常非常消耗性能的。雖然我們之前已經用了

資料庫連接配接池,隻是建立了固定數量的資料庫連接配接。

你還是得多次通過資料庫連接配接,往資料庫(MySQL)發送一條SQL語句,然後MySQL需要去執行這條SQL語句。

如果有100萬條資料,那麼就是100萬次發送SQL語句。

以上兩點(資料庫連接配接,多次發送SQL語句),都是非常消耗性能的。

foreachPartition,在生産環境中,通常來說,都使用foreachPartition來寫資料庫的

使用批處理操作(一條SQL和多組參數)

發送一條SQL語句,發送一次

一下子就批量插入100萬條資料。

用了foreachPartition算子之後,好處在哪裡?

1、對于我們寫的function函數,就調用一次,一次傳入一個partition所有的資料

2、主要建立或者擷取一個資料庫連接配接就可以

3、隻要向資料庫發送一次SQL語句和多組參數即可

參考《算子優化 foreachPartition》 https://blog.csdn.net/u013939918/article/details/60881711

http://www.waitingfy.com/archives/4370