天天看點

使用SparkSql 讀取ES資料

  1. 對于大批量資料,查詢es時,需要帶條件去查詢,否則一下查出所有資料資料量會很大
  2. es查詢需要編寫json格式的DSL查詢語句,對于複雜查詢,DSL編寫起來也分很複雜,是以我們這裡使用sparksql,通過編寫sql語句,spark将sql語句自動轉化為DSL語句來查詢es。

    關于es對spark的支援可參見文檔,官網

    https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

廢話不多說,上代碼

case class people(phone:String)

val sparkConf = new SparkConf().setAppName("application-name").setMaster("local[4]")

val sc = new SparkContext(sparkConf)

val sql = new SQLContext(sc)

import sql.implicits._

//配置

val options = Map(

"pushdown" -> "true",
    "es.nodes" -> "192.45.15.15", //ip
    "es.port" -> "9200") //端口           

val esDF = sql.read.format("org.elasticsearch.spark.sql")

.options(options)
      .load("spark_202003/alldata")  //索引
    // esDF.printSchema() //可列印schema資訊           

val table = esDF.createOrReplaceTempView("spark_202003") //建立臨時表用于查詢

val sqlString=“select * from spark_202003 where code=‘1’ ”

val resultDF = sql.sql(sqlString)

var resultDS = resultDF.map(x => people(x(0).toString))) //

這裡是轉換成了dataset,之後可對dataset 做其他操作

列印下查詢的内容

resultDS.collect().toList.toString()

繼續閱讀