天天看點

學習記錄:Scala解析ElasticSearch RestfulAPI傳回的JSON字元串并提取内容轉換成DataFrame

由于需要從傳回封包裡面解析JSON字元串 并提取内容,學習了下相關API用法

需要解析的JSON字元串格式如下所示:

{
  "took": 13,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1.2,
    "hits": [
      {
        "_index": "my_store",
        "_type": "products",
        "_id": "2",
        "_score": 1.2,
        "_source": {
          "price": 20,
          "productID": "KDKE-B-9947-#kL5"
        }
      },
      {
        "_index": "my_store",
        "_type": "products",
        "_id": "4",
        "_score": 1.2,
        "_source": {
          "price": 30,
          "productID": "QQPX-R-3956-#aD8"
        }
      }
    ]
  }
}
           

具體代碼如下:

①使用Scala原生包(import scala.util.parsing.json._)裡面的API--*不是很推薦*,這個API在Scala 2.11.0之後被移除了,如果使用Scala 2.11.0及之後的版本會提示API已被廢棄,雖然能運作但是Maven Build會失敗,隻能本地IDE跑。

// 此部分代碼是從main方法裡面提取的代碼片段,不是完整代碼

// 此處是從ElasticSearch請求查詢資料,傳回内容response就是JSON字元串,
// 不過裡面有多層,需要進行解析才能拿到自己想要的那部分
var response: String = Utils.postBody(
        url = s"${esProps.getProperty(s"es.rest")}/${indexName}/_search",
        body = queryString,
        encoding = "utf-8",
        printResponse = true)
// 進行初步解析,傳回内容為Option[Any]
val jsonS = JSON.parseFull(response)
// 調用自定義的regJson方法(在下面貼出代碼)将初步解析内容轉換成Map[String, Any]格式内容
val first = regJson(jsonS)
// 取得最外層hits内容并轉換成Map[String, Any]格式内容
val second = regJson(first.get("hits"))
var strList = List.empty[String]
// 取得内層hits内容并轉為List格式
val list = second.get("hits").get.asInstanceOf[List[Any]]
// 周遊list取出裡面的_source内容并拼接成JSON字元串
for (i <- list) {
    // 取得_source内容
    val source = i.asInstanceOf[Map[String, Any]].get("_source").get
    // 将_source内容轉換成字元串格式
    val jsonString: String = JSONObject.apply(source.asInstanceOf[Map[String, Any]]).toString();
    // 拼接到strList中
    strList = strList :+ jsonString
}
debug("strList:" + strList)
// 将strList轉換成DataFrame
val rddData: RDD[String] = spark.sparkContext.parallelize(strList)
val dataFromEsDf = spark.read.json(rddData)
           

上面代碼片段使用到的regJson方法

def regJson(json:Option[Any]) = json match {
    case Some(map: Map[String, Any]) => map
  }
           

②使用json-smart,可以使用Maven添加依賴或者添加依賴包

代碼如下所示:

// 此部分代碼是從main方法裡面提取的代碼片段,不是完整代碼

// 此處是從ElasticSearch請求查詢資料,傳回内容response就是JSON字元串,
// 不過裡面有多層,需要進行解析才能拿到自己想要的那部分

import net.minidev.json.parser.JSONParser
import net.minidev.json.{JSONArray, JSONObject}

......

var response: String = Utils.postBody(
      url = s"${esProps.getProperty("es.rest")}/bi_enterprise/_search?scroll=1m",
      body = "{\"query\": {\"match_all\": {}},\"size\": 10000,\"sort\": [\"_doc\"]}",
      encoding = "utf-8",
      printResponse = true)
// 設定解析模式為-1,這個模式最快
// smart mode, fastest parsing mode. accept lots of non standard json syntax
val jsonParser = new JSONParser(-1)
// 将内容解析成JSONObject
val jsonObj: JSONObject = jsonParser.parse(response).asInstanceOf[JSONObject]
// 取得外層hits内容并轉換成字元串
val hits = jsonObj.get("hits").toString
// 将外層hits内容解析成JSONObject
val jsonObjHits: JSONObject = jsonParser.parse(hits).asInstanceOf[JSONObject]
// 取得内層hits内容并轉換成字元串
val hitsArray = jsonObjHits.get("hits").toString
// 将内層hits内容解析成JSONArray
val jsonObjSource: JSONArray = jsonParser.parse(hitsArray).asInstanceOf[JSONArray]
hitSize = jsonObjSource.size()
var strList = List.empty[String]
// 将JSONArray轉換成Array并進行周遊
var array = jsonObjSource.toArray()
for (i <- 0 to array.length - 1) {
    val Obj: JSONObject = array(i).asInstanceOf[JSONObject]
    // 取得_source内容并拼接到strList
    strList = strList :+ Obj.get("_source").toString
}
// 轉換成DataFrame
val rddData: RDD[String] = spark.sparkContext.parallelize(strList)
val resultDF = spark.read.json(rddData)
           

雖然還有其他的解析JSON工具包,不過現在暫時就學習了這兩種,第一種被廢棄了,相當于就學習了一種。人們常說舉一反三,學會一種,再學其他應該也不會太難。僅做記錄使用,代碼沒注重太多細節。