文章目录
- es数据
- pyspark读取es解析json (包含循环数组):
- 日志
es数据
{
"_index": "freyr-20210315",
"_type": "serviceInvoke",
"_id": "AXg98PySCQeiakUv0e4k",
"_score": 36.110207,
"_source": {
"msg": "成功",
"invokeType": 2,
"cost": 309,
"invokeSource": 1,
"tokenId": "3d854363ae749d1a4e8d00451b1a8f86",
"docId": "vyHjSULoZiKkcpgNMuBfxBPgKclcecPRCKPDlKVB",
"serviceDisplayName": "xxxx信息平台-个人",
"requestParam": {
"headers": {
},
"bodyParam": {
},
"outBody": "",
"urlParam": {
"documentNo": "131081198308**1074",
"appid": "722433dac0b421987162276f0c1e0367",
"name": "张*平"
}
},
"serviceName": "lawEnforcement_person",
"sequenceId": "1615946906356A11299",
"businessServiceName": "个贷系统",
"requestTime": 1615946906363,
"businessCode": "channel3",
"processData": {
"data": "{\"code\":\"200\",\"data\":{\"pagination\":{\"total\":8,\"pageCount\":1,\"previousPage\":1,\"nextPage\":1,\"currentCount\":8,\"currentPage\":1},\"items\":{\"sfss_xzxf\":[{\"punishmentOrgan\":\"西西市人民法院\",\"dataKeyId\":\"6fba119210fb44ec3b96dd05898f5669\",\"sex\":\"男\",\"implementation\":\"\",\"dataType\":\"64\",\"cause\":\"本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务\",\"remark\":\"\",\"idNumber\":\"131081198308**1074\",\"filingTime\":\"2019-04-09\",\"caseNumber\":\"(2019)冀1081执827号\",\"name\":\"张*平\",\"zqr\":\"\",\"uscCode\":\"\",\"bzxr\":\"\"},{\"punishmentOrgan\":\"西西市人民法院\",\"dataKeyId\":\"4f5ac433e126fe058b955a6b59874102\",\"sex\":\"男\",\"implementation\":\"\",\"dataType\":\"64\",\"cause\":\"本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务\",\"remark\":\"\",\"idNumber\":\"131081198308**1074\",\"filingTime\":\"2019-04-09\",\"caseNumber\":\"(2019)冀1081执826号\",\"name\":\"张*平\",\"zqr\":\"\",\"uscCode\":\"\",\"bzxr\":\"\"}],\"hjqk_hjqk\":[],\"swcf_zdss\":[],\"gxyd_xzcf\":[],\"zcxx_ldzc\":[]}},\"success\":\"true\",\"message\":\"交易成功\"}"
},
"recordTime": 1615946906678,
"organizationCode": "tsbankyyglb",
"success": true,
"contractId": "8da17650f014443cbccd6cb1c53ff50e",
"createdTime": "2021-03-17 10:08:26",
"recordDate": "20210317",
"status": 1
}
}
processData.data
{
"code": "200",
"data": {
"pagination": {
"total": 8,
"pageCount": 1,
"previousPage": 1,
"nextPage": 1,
"currentCount": 8,
"currentPage": 1
},
"items": {
"sfss_xzxf": [
{
"punishmentOrgan": "西西市人民法院",
"dataKeyId": "6fba119210fb44ec3b96dd05898f5669",
"sex": "男",
"implementation": "",
"dataType": "64",
"cause": "本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务",
"remark": "",
"idNumber": "131081198308**1074",
"filingTime": "2019-04-09",
"caseNumber": "(2019)冀1081执827号",
"name": "张*平",
"zqr": "",
"uscCode": "",
"bzxr": ""
},
{
"punishmentOrgan": "西西市人民法院",
"dataKeyId": "4f5ac433e126fe058b955a6b59874102",
"sex": "男",
"implementation": "",
"dataType": "64",
"cause": "本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务",
"remark": "",
"idNumber": "131081198308**1074",
"filingTime": "2019-04-09",
"caseNumber": "(2019)冀1081执826号",
"name": "张*平",
"zqr": "",
"uscCode": "",
"bzxr": ""
}
],
"hjqk_hjqk": [
],
"swcf_zdss": [
],
"gxyd_xzcf": [
],
"zcxx_ldzc": [
]
}
},
"success": "true",
"message": "交易成功"
}
pyspark读取es解析json (包含循环数组):
#/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import date
from datetime import datetime,timedelta
import json,os
def main(sparkSession):
#读取ES数据并创建临时表
df = sparkSession.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", '197.4.218.113') \
.option('es.port', str(port)) \
.option("es.resource", 'freyr-20210315/serviceInvoke') \
.option('es.query', '{"query":{"bool":{"must":[{"match":{"serviceName":"lawEnforcement_person"}}, {"match":{"invokeType":"2"}}]}}}') \
.option('es.nodes.wan.only', 'true') \
.option("es.nodes.discovery", "false") \
.option("es.index.auto.create", "true") \
.option("es.read.field.exclude","requestParam.headers") \
.option("es.write.ignore_exception", "true") \
.option("es.read.ignore_exception", "true") \
.load()
df.createTempView('tdl_qm_stration_law_per_dt')
df.show()
#读取临时表中的数据
result=sparkSession.sql("""
SELECT
responseData.data,
sequenceId,
serviceName,
errorCode,
createdTime,
recordDate,
organizationCode,
businessCode,
requestParam.urlParam.name,
requestParam.urlParam.documentNo
FROM
tdl_qm_stration_law_per_dt
WHERE
serviceName = 'lawEnforcement_person'
AND errorCode = '10000'
AND invokeType = '2' AND sequenceId ='1615946906356A11299'
""" +addContion)
#将数据写入创建好的HDFS目录
#result.write.mode("append").format('json').save("/user/datacompute/users/xin.zhou/elasticData/"+tdate)
#解析数据集中的json数据
res=result.select("serviceName","createdTime","recordDate","organizationCode","businessCode","errorCode"
,"name","documentNo","sequenceId"
,F.get_json_object("data","$.code").alias("code")
,F.get_json_object("data","$.message").alias("message")
,F.get_json_object("data","$.data.items.sfss_xzxf").alias("sfss_xzxf")
,F.get_json_object("data","$.data.items.hjqk_hjqk").alias("hjqk_hjqk"))
res.show()
res.createTempView('tdl_qm_stration_law_per_dt1')
#限制消费循环解析
sfss_xzxf = sparkSession.sql("""select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where sfss_xzxf is not null""")
schema="""array<struct<
punishCause :string,
punishmentOrgan :string,
penaltyTime :string,
caseNumber :string,
dataKeyId :string,
legalPerson :string,
dataType :string,
name :string,
remark :string,
uscCode :string,
idNumber :string
>>"""
scjg_xzxf_rds=sfss_xzxf.withColumn("sfss_xzxf",F.from_json(F.col("sfss_xzxf"),schema)).withColumn("expl",F. explode("sfss_xzxf")).select(sfss_xzxf.sequenceId,"expl.*")
scjg_xzxf_rds.show()
# scjg_xzcf_rds.createTempView("tdl_scjg_xzcf_dt")
# sparkSession.sql("""issert into bigdata.per_ms_pther_penalties_dt partition (ds="""+tdate+""")
# select
# sequenceId as sequence_id,
# name as customer_name,
# uscCode as usc_code,
# legalPerson as legal_person,
# idNumber as id_number,
# caseNumber as case_number,
# punishmentOrgan as punishment_organ,
# penaltyTime as penalty_time,
# '',
# punishCause as punish_cause,
# '',
# '',
# remark as remark,
# dataType as data_type,
# dataKeyId as data_key_id
# from tdl_scjg_xzcf_dt""")
日志
[2021-05-16 18:44:02.236] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(0/1)
[2021-05-16 18:44:02.941] [INFO] - +--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
| serviceName| createdTime|recordDate|organizationCode|businessCode|errorCode| name| documentNo| sequenceId|code| message| sfss_xzxf|blgy_blgy|qtcf|sfss_gscg|zyfdr|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
|lawEnforcement_pe...|2021-03-17 10:08:26| 20210317| tsbankyyglb| channel3| 10000|张*平|131081198308**1074|1615946906356A11299| 200|交易成功|[{"name":"张*平","b...| []| []| null| []|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
[2021-05-16 18:44:02.942] [INFO] - create temp table: tdl_qm_stration_law_per_dt1
[2021-05-16 18:44:02.942] [INFO] - check authority success
[2021-05-16 18:44:02.942] [INFO] - executing sql: select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where sfss_xzxf is not null
[2021-05-16 18:44:03.647] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(1/1); Job9/stage9(2/2); Job10/stage10(1/1); Job11/stage11(0/2)
[2021-05-16 18:44:03.647] [INFO] - +-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
| sequenceId|punishCause|punishmentOrgan|penaltyTime| caseNumber| dataKeyId|legalPerson|dataType| name|remark|uscCode| idNumber|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
|1615946906356A11299| null| 西西市人民法院| null|(2019)冀1081执827号|6fba119210fb44ec3...| null| 64|张*平| | |131081198308**1074|
|1615946906356A11299| null| 西西市人民法院| null|(2019)冀1081执826号|4f5ac433e126fe058...| null| 64|张*平| | |131081198308**1074|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+