spark thrift server的web ui在运行时可以看到sql查询的提交用户,执行sql等信息
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLykFVPJTRE1UeRpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3MDOzIDNxkTMyAzNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
但是当这个实例停掉或者异常终止以后,你再去spark history server的webui去查看,发现这部分信息就没有了……
究其原因,原来spark thrift server并没有将这部分数据序列化到spark history server的store中,回头有空可以单独讲讲这部分源码的实现
这篇帖子是使用一个折中的办法实现了这部分数据的日志留存
修改spark-hive-thriftserver工程下org.apache.spark.sql.hive.thriftserver.HiveThriftServer2类,做如下修改:
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
//增加下面一句话
SqlListenerUtil.write(executionList(id))
trimExecutionIfNecessary()
}
}
def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
//增加下面一句话
SqlListenerUtil.write(executionList(id))
trimExecutionIfNecessary()
}
同时新增org.apache.spark.sql.hive.thriftserver.SqlListenerUtil类
package org.apache.spark.sql.hive.thriftserver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, uiTab}
import org.apache.spark.status.api.v1.{JobData, StageData}
import scala.collection.mutable.ArrayBuffer
object SqlListenerUtil extends Logging {
val mapper: ObjectMapper with ScalaObjectMapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val stagesInfo: ArrayBuffer[StageData] = ArrayBuffer[StageData]()
val jobsInfo: ArrayBuffer[JobData] = ArrayBuffer[JobData]()
def write(executionInfo: ExecutionInfo) = synchronized {
stagesInfo.clear()
jobsInfo.clear()
val sparkUI = uiTab.get.parent
val store = sparkUI.store
executionInfo.jobId.foreach {
id =>
val jobData = store.job(id.toInt)
jobsInfo += jobData
jobData.stageIds.foreach {
stageId =>
val stageDatas = store.stageData(stageId)
stagesInfo ++= stageDatas
}
}
val sqlInfo = SqlInfo(sparkUI.appId, executionInfo, jobsInfo, stagesInfo)
log.info(mapper.writeValueAsString(sqlInfo))
}
case class SqlInfo(appId: String, executionInfo: ExecutionInfo, jobsInfo: ArrayBuffer[JobData], stagesInfo: ArrayBuffer[StageData])
}
重新打包编辑后替换相应的jar包
修改spark安装目录下的log4j.properties,增加如下信息:
# 自定义sql查询监控
log4j.logger.org.apache.spark.sql.hive.thriftserver.SqlListenerUtil=INFO,listener
log4j.additivity.org.apache.spark.sql.hive.thriftserver.SqlListenerUtil=false
log4j.appender.listener=org.apache.log4j.DailyRollingFileAppender
log4j.appender.listener.File=/var/log/spark2/spark-sql-listener
log4j.appender.listener.layout=org.apache.log4j.PatternLayout
log4j.appender.listener.layout.ConversionPattern=%m%n
log4j.appender.listener.DatePattern=.yyyy-MM-dd
重启spark-thrift-server
简单重启命令<带kerbose认证,没有动态资源申请,参数调优>:
./start-thriftserver.sh \
--hiveconf hive.server2.authentication.kerberos.principal hive/[email protected] \
--hiveconf hive.server2.authentication.kerberos.keytab /home/zmbd-uat-presto87.keytab \
--hiveconf hive.server2.thrift.port=10001 \
--conf spark.network.timeout=600 \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--queue root.zm_yarn_pool.development \
--master yarn --executor-memory 4g --executor-cores 2 --num-executors 8
这样查询日志就以json格式记录在/var/log/spark2/spark-sql-listener文件中了
可以看出,spark的SQL运行日志以json格式输出了