import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
object Conn_Oracle {
def main(args: Array[String]): Unit = {
//Spark 連接配接Oracle資料庫
//擷取sparkConf
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName(this.getClass.getName)
//擷取sparksesssion
val session = SparkSession
.builder()
.config(conf)
.getOrCreate()
val sc = session.sparkContext
// --------------------------------連接配接oracle資料庫 ----------------------------------
val reader = session.read.format("jdbc")
.option("url", "jdbc:oracle:thin:@10.10.1.253:1521:bidwdev")
.option("driver", "oracle.jdbc.driver.OracleDriver")
.option("user", "user")
.option("password", "password")
.option("dbtable", "表名字")
val jdbcDf: DataFrame = reader.load()
jdbcDf.show()
//----------------------------------廣播變量------------------------------------------------
val kafkaProducer:Broadcast[KafkaSink[String,String]]={
val kafkaProducerConfig={
val p=new Properties()
p.setProperty("bootstrap.servers","10.133.232.57:9092")
// p.setProperty("zookeeper.connect","10.133.232.57:9093")
p.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
p.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
// p.setProperty("partitioner.class","com.buba.kafka.producer.CustomPartitioner")
p.put("acks", "all")
p.put("retries", "0")
p.put("delivery.timeout.ms","30001")
p.put("request.timeout.ms","30000")
// 請求延時
p.put("linger.ms", "1")
p
}
sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
}
//将讀取的接收到的資料廣播分發到Oracle_test這個topic中
jdbcDf.foreachPartition(rdd=>{
rdd.foreach(row => {
kafkaProducer.value.send("Oracle_test", row.toString)
})
})
// jdbcDf.show()
sc.stop()
session.stop()
}
}
