天天看點

Spark-DateType-Timestamp-cast-小結前言問題 & 解決Deep dive番外 : ImplicitCastInputTypes

title: Spark DateType/Timestamp cast 小結

date: 2018-07-19 16:47:39

tags:

  • Spark

前言

在平時的 Spark 進行中常常會有把一個如

2012-12-12

這樣的 date 類型轉換成一個 long 的 Unix time 然後進行計算的需求.下面是一段示例代碼:

val schema = StructType(
  Array(
    StructField("id", IntegerType, nullable = true),
    StructField("birth", DateType, nullable = true),
    StructField("time", TimestampType, nullable = true)
  ))

val data = Seq(
  Row(1, Date.valueOf("2012-12-12"), Timestamp.valueOf("2016-09-30 03:03:00")),
  Row(2, Date.valueOf("2016-12-14"), Timestamp.valueOf("2016-12-14 03:03:00")))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)           

問題 & 解決

首先很直覺的是直接把DateType cast 成 LongType, 如下:

df.select(df.col("birth").cast(LongType))

但是這樣出來都是 null, 這是為什麼? 答案就在

org.apache.spark.sql.catalyst.expressions.Cast

中, 先看 canCast 方法, 可以看到 DateType 其實是可以轉成 NumericType 的, 然後再看下面castToLong的方法, 可以看到

case DateType => buildCast[Int](_, d => null)

居然直接是個 null, 看送出記錄其實這邊有過反複, 然後為了和 hive 統一, 是以傳回最後還是傳回 null 了.

雖然 DateType 不能直接 castToLong, 但是TimestampType可以, 是以這裡的解決方案就是先把 DateType cast 成 TimestampType. 但是這裡又會有一個非常坑爹的問題: 時區問題.

首先明确一個問題, 就是這個放到了 spark 中的 2012-12-12 到底 UTC 還是我們目前時區? 答案是如果沒有經過特殊配置, 這個2012-12-12代表的是 目前時區的 2012-12-12 00:00:00., 對應 UTC 其實是: 2012-12-11 16:00:00, 少了8小時. 這裡還順便說明了Spark 入庫 Date 資料的時候是帶着時區的.

然後再看DateType cast toTimestampType 的代碼, 可以看到

buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000)

, 這裡是帶着時區的, 但是 Spark SQL 預設會用目前機器的時區. 但是大家一般底層資料比如這個2016-09-30, 都是代表的 UTC 時間, 在用 Spark 處理資料的時候, 這個時間還是 UTC 時間, 隻有通過 JDBC 出去的時間才會變成帶目标時區的結果. 經過摸索, 這裡有兩種解決方案:

  1. 配置 Spark 的預設時區

    config("spark.sql.session.timeZone", "UTC")

    , 最直覺. 這樣直接寫

    df.select(df.col("birth").cast(TimestampType).cast(LongType))

    就可以了.
  2. 不配置 conf, 正面剛:

    df.select(from_utc_timestamp(to_utc_timestamp(df.col("birth"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID).cast(LongType))

    , 可以看到各種 cast, 這是差別:
  • 沒有配置 UTC:

    from_utc_timestamp(to_utc_timestamp(lit("2012-12-11 16:00:00"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID)

  • 配置了 UTC:

    from_utc_timestamp(to_utc_timestamp(lit("2012-12-12 00:00:00"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID)

    多了8小時
/**
   * Returns true iff we can cast `from` type to `to` type.
   */
  def canCast(from: DataType, to: DataType): Boolean = (from, to) match {
    case (fromType, toType) if fromType == toType => true

    case (NullType, _) => true

    case (_, StringType) => true

    case (StringType, BinaryType) => true

    case (StringType, BooleanType) => true
    case (DateType, BooleanType) => true
    case (TimestampType, BooleanType) => true
    case (_: NumericType, BooleanType) => true

    case (StringType, TimestampType) => true
    case (BooleanType, TimestampType) => true
    case (DateType, TimestampType) => true
    case (_: NumericType, TimestampType) => true

    case (StringType, DateType) => true
    case (TimestampType, DateType) => true

    case (StringType, CalendarIntervalType) => true

    case (StringType, _: NumericType) => true
    case (BooleanType, _: NumericType) => true
    case (DateType, _: NumericType) => true
    case (TimestampType, _: NumericType) => true
    case (_: NumericType, _: NumericType) => true
    ...
  }
           
private[this] def castToLong(from: DataType): Any => Any = from match {
    case StringType =>
      val result = new LongWrapper()
      buildCast[UTF8String](_, s => if (s.toLong(result)) result.value else null)
    case BooleanType =>
      buildCast[Boolean](_, b => if (b) 1L else 0L)
    case DateType =>
      buildCast[Int](_, d => null)
    case TimestampType =>
      buildCast[Long](_, t => timestampToLong(t))
    case x: NumericType =>
      b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
  }           
// TimestampConverter
  private[this] def castToTimestamp(from: DataType): Any => Any = from match {
    ...
    case DateType =>
      buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000)
    // TimestampWritable.decimalToTimestamp
    ...
  }           
/**
   * Given a timestamp, which corresponds to a certain time of day in the given timezone, returns
   * another timestamp that corresponds to the same time of day in UTC.
   * @group datetime_funcs
   * @since 1.5.0
   */
  def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
    ToUTCTimestamp(ts.expr, Literal(tz))
  }

  /**
   * Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp
   * that corresponds to the same time of day in the given timezone.
   * @group datetime_funcs
   * @since 1.5.0
   */
  def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
    FromUTCTimestamp(ts.expr, Literal(tz))
  }           

Deep dive

配置源碼解讀:

val SESSION_LOCAL_TIMEZONE = buildConf("spark.sql.session.timeZone").stringConf.createWithDefaultFunction(() => TimeZone.getDefault.getID)           

def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)

/**
 * Replace [[TimeZoneAwareExpression]] without timezone id by its copy with session local
 * time zone.
 */
case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] {
  private val transformTimeZoneExprs: PartialFunction[Expression, Expression] = {
    case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
      e.withTimeZone(conf.sessionLocalTimeZone)
    // Casts could be added in the subquery plan through the rule TypeCoercion while coercing
    // the types between the value expression and list query expression of IN expression.
    // We need to subject the subquery plan through ResolveTimeZone again to setup timezone
    // information for time zone aware expressions.
    case e: ListQuery => e.withNewPlan(apply(e.plan))
  }

  override def apply(plan: LogicalPlan): LogicalPlan =
    plan.transformAllExpressions(transformTimeZoneExprs)

  def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
}

/**
 * Mix-in trait for constructing valid [[Cast]] expressions.
 */
trait CastSupport {
  /**
   * Configuration used to create a valid cast expression.
   */
  def conf: SQLConf

  /**
   * Create a Cast expression with the session local time zone.
   */
  def cast(child: Expression, dataType: DataType): Cast = {
    Cast(child, dataType, Option(conf.sessionLocalTimeZone))
  }
}           

org.apache.spark.sql.catalyst.analysis.Analyzer#batches 可以看到有

ResolveTimeZone

lazy val batches: Seq[Batch] = Seq(

    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ...
      ResolveTimeZone(conf) ::
      ResolvedUuidExpressions ::
      TypeCoercion.typeCoercionRules(conf) ++
      extendedResolutionRules : _*),
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
    Batch("View", Once,
      AliasViewChild(conf)),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Subquery", Once,
      UpdateOuterReferences),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )           

Test Example

對于時區了解

在不同的時區下 sql.Timestamp 對象的表現:

這裡是 GMT+8:

Timestamp "2014-06-24 07:22:15.0"
    - fastTime = 1403565735000
    - "2014-06-24T07:22:15.000+0700"           

如果是 GMT+7, 會顯示如下,可以看到是同一個毫秒數

Timestamp "2014-06-24 06:22:15.0"
    - fastTime = 1403565735000
    - "2014-06-24T06:22:15.000+0700"           
test("ColumnBatch") {
    val schema = StructType(
      Array(
        StructField("id", IntegerType, nullable = true),
        StructField("birth", DateType, nullable = true),
        StructField("time", TimestampType, nullable = true)
      ))

    val columnarBatch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, 1024)
    val c0 = columnarBatch.column(0)
    val c1 = columnarBatch.column(1)
    val c2 = columnarBatch.column(2)

    c0.putInt(0, 0)
    // 1355241600, /3600/24 s to days
    c1.putInt(0, 1355241600 / 3600 / 24)
    // microsecond
    c2.putLong(0, 1355285532000000L)

    val internal0 = columnarBatch.getRow(0)

    //a way converting internal row to unsafe row.
    //val convert = UnsafeProjection.create(schema)
    //val internal = convert.apply(internal0)

    val enc = RowEncoder.apply(schema).resolveAndBind()
    val row = enc.fromRow(internal0)
    val df = spark.createDataFrame(Lists.newArrayList(row), schema)

    TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
    val tsStr0 = df.select(col("time")).head().getTimestamp(0).toString
    val ts0 = df.select(col("time").cast(LongType)).head().getLong(0)

    TimeZone.setDefault(TimeZone.getTimeZone("GMT+8"))
    val tsStr1 = df.select(col("time")).head().getTimestamp(0).toString
    val ts1 = df.select(col("time").cast(LongType)).head().getLong(0)

    assert(true, "2012-12-12 04:12:12.0".equals(tsStr0))
    assert(true, "2012-12-12 12:12:12.0".equals(tsStr1))
    // to long 之後毫秒數都是一樣的
    assert(true, ts0 == ts1)
  }           

番外 : ImplicitCastInputTypes

我們自己定義了一個Expr, 要求接受兩個 input 為 DateType 的參數.

case class MockExpr(d0: Expression, d1: Expression)
  extends BinaryExpression with ImplicitCastInputTypes {

  override def left: Expression = d0

  override def right: Expression = d1

  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType)

  override def dataType: DataType = IntegerType

  override def nullSafeEval(date0: Any, date1: Any): Any = {
    ...
  }
}           

假設我們有如下調用, 請問這個調用符合預期嗎? 結論是符合的, 因為有

ImplicitCastInputTypes

.

lit("2012-11-12 12:12:12.0").cast(TimestampType)
lit("2012-12-12 12:12:12.0").cast(TimestampType)
Column(MockExpr(tsc1.expr, tsc2.expr))           

org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts

case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
  // If we cannot do the implicit cast, just use the original input.
  implicitCast(in, expected).getOrElse(in)
}
e.withNewChildren(children)

def implicitCast(e: Expression, expectedType: AbstractDataType): Option[Expression] = {
  implicitCast(e.dataType, expectedType).map { dt =>
    if (dt == e.dataType) e else Cast(e, dt)
  }
}           

org.apache.spark.sql.catalyst.expressions.Cast#castToDate #DateConverter

private[this] def castToDate(from: DataType): Any => Any = from match {
  case StringType =>
    buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).orNull)
  case TimestampType =>
    // throw valid precision more than seconds, according to Hive.
    // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
    buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone))
}