天天看點

oracle轉sparksql工具化,SparkSQL讀取Oracle資料到kafka中

import java.util.Properties

import org.apache.spark.SparkConf

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.sql.{DataFrame, SparkSession}

object Conn_Oracle {

def main(args: Array[String]): Unit = {

//Spark 連接配接Oracle資料庫

//擷取sparkConf

val conf = new SparkConf()

.setMaster("local[*]")

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.setAppName(this.getClass.getName)

//擷取sparksesssion

val session = SparkSession

.builder()

.config(conf)

.getOrCreate()

val sc = session.sparkContext

// --------------------------------連接配接oracle資料庫 ----------------------------------

val reader = session.read.format("jdbc")

.option("url", "jdbc:oracle:thin:@10.10.1.253:1521:bidwdev")

.option("driver", "oracle.jdbc.driver.OracleDriver")

.option("user", "user")

.option("password", "password")

.option("dbtable", "表名字")

val jdbcDf: DataFrame = reader.load()

jdbcDf.show()

//----------------------------------廣播變量------------------------------------------------

val kafkaProducer:Broadcast[KafkaSink[String,String]]={

val kafkaProducerConfig={

val p=new Properties()

p.setProperty("bootstrap.servers","10.133.232.57:9092")

// p.setProperty("zookeeper.connect","10.133.232.57:9093")

p.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")

p.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

// p.setProperty("partitioner.class","com.buba.kafka.producer.CustomPartitioner")

p.put("acks", "all")

p.put("retries", "0")

p.put("delivery.timeout.ms","30001")

p.put("request.timeout.ms","30000")

// 請求延時

p.put("linger.ms", "1")

p

}

sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))

}

//将讀取的接收到的資料廣播分發到Oracle_test這個topic中

jdbcDf.foreachPartition(rdd=>{

rdd.foreach(row => {

kafkaProducer.value.send("Oracle_test", row.toString)

})

})

// jdbcDf.show()

sc.stop()

session.stop()

}

}

oracle轉sparksql工具化,SparkSQL讀取Oracle資料到kafka中