由于需要從傳回封包裡面解析JSON字元串 并提取内容,學習了下相關API用法
需要解析的JSON字元串格式如下所示:
{
"took": 13,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1.2,
"hits": [
{
"_index": "my_store",
"_type": "products",
"_id": "2",
"_score": 1.2,
"_source": {
"price": 20,
"productID": "KDKE-B-9947-#kL5"
}
},
{
"_index": "my_store",
"_type": "products",
"_id": "4",
"_score": 1.2,
"_source": {
"price": 30,
"productID": "QQPX-R-3956-#aD8"
}
}
]
}
}
具體代碼如下:
①使用Scala原生包(import scala.util.parsing.json._)裡面的API--*不是很推薦*,這個API在Scala 2.11.0之後被移除了,如果使用Scala 2.11.0及之後的版本會提示API已被廢棄,雖然能運作但是Maven Build會失敗,隻能本地IDE跑。
// 此部分代碼是從main方法裡面提取的代碼片段,不是完整代碼
// 此處是從ElasticSearch請求查詢資料,傳回内容response就是JSON字元串,
// 不過裡面有多層,需要進行解析才能拿到自己想要的那部分
var response: String = Utils.postBody(
url = s"${esProps.getProperty(s"es.rest")}/${indexName}/_search",
body = queryString,
encoding = "utf-8",
printResponse = true)
// 進行初步解析,傳回内容為Option[Any]
val jsonS = JSON.parseFull(response)
// 調用自定義的regJson方法(在下面貼出代碼)将初步解析内容轉換成Map[String, Any]格式内容
val first = regJson(jsonS)
// 取得最外層hits内容并轉換成Map[String, Any]格式内容
val second = regJson(first.get("hits"))
var strList = List.empty[String]
// 取得内層hits内容并轉為List格式
val list = second.get("hits").get.asInstanceOf[List[Any]]
// 周遊list取出裡面的_source内容并拼接成JSON字元串
for (i <- list) {
// 取得_source内容
val source = i.asInstanceOf[Map[String, Any]].get("_source").get
// 将_source内容轉換成字元串格式
val jsonString: String = JSONObject.apply(source.asInstanceOf[Map[String, Any]]).toString();
// 拼接到strList中
strList = strList :+ jsonString
}
debug("strList:" + strList)
// 将strList轉換成DataFrame
val rddData: RDD[String] = spark.sparkContext.parallelize(strList)
val dataFromEsDf = spark.read.json(rddData)
上面代碼片段使用到的regJson方法
def regJson(json:Option[Any]) = json match {
case Some(map: Map[String, Any]) => map
}
②使用json-smart,可以使用Maven添加依賴或者添加依賴包
代碼如下所示:
// 此部分代碼是從main方法裡面提取的代碼片段,不是完整代碼
// 此處是從ElasticSearch請求查詢資料,傳回内容response就是JSON字元串,
// 不過裡面有多層,需要進行解析才能拿到自己想要的那部分
import net.minidev.json.parser.JSONParser
import net.minidev.json.{JSONArray, JSONObject}
......
var response: String = Utils.postBody(
url = s"${esProps.getProperty("es.rest")}/bi_enterprise/_search?scroll=1m",
body = "{\"query\": {\"match_all\": {}},\"size\": 10000,\"sort\": [\"_doc\"]}",
encoding = "utf-8",
printResponse = true)
// 設定解析模式為-1,這個模式最快
// smart mode, fastest parsing mode. accept lots of non standard json syntax
val jsonParser = new JSONParser(-1)
// 将内容解析成JSONObject
val jsonObj: JSONObject = jsonParser.parse(response).asInstanceOf[JSONObject]
// 取得外層hits内容并轉換成字元串
val hits = jsonObj.get("hits").toString
// 将外層hits内容解析成JSONObject
val jsonObjHits: JSONObject = jsonParser.parse(hits).asInstanceOf[JSONObject]
// 取得内層hits内容并轉換成字元串
val hitsArray = jsonObjHits.get("hits").toString
// 将内層hits内容解析成JSONArray
val jsonObjSource: JSONArray = jsonParser.parse(hitsArray).asInstanceOf[JSONArray]
hitSize = jsonObjSource.size()
var strList = List.empty[String]
// 将JSONArray轉換成Array并進行周遊
var array = jsonObjSource.toArray()
for (i <- 0 to array.length - 1) {
val Obj: JSONObject = array(i).asInstanceOf[JSONObject]
// 取得_source内容并拼接到strList
strList = strList :+ Obj.get("_source").toString
}
// 轉換成DataFrame
val rddData: RDD[String] = spark.sparkContext.parallelize(strList)
val resultDF = spark.read.json(rddData)
雖然還有其他的解析JSON工具包,不過現在暫時就學習了這兩種,第一種被廢棄了,相當于就學習了一種。人們常說舉一反三,學會一種,再學其他應該也不會太難。僅做記錄使用,代碼沒注重太多細節。