天天看點

Spark 3.0 對于 DATE 和 TIMESTAMP 的改進

原文連結: https://databricks.com/blog/2020/07/22/a-comprehensive-look-at-dates-and-timestamps-in-apache-spark-3-0.html

翻譯:彭慧波,FreeWheel 基礎架構大資料開發工程師

Spark是一個當下較為熱門的,能同時處理結構化資料和非結構化資料的工具。Spark能夠支援諸如integer, long, double, string等在内的基本資料類型,同時也支援包括DATE和TIMESTAMP在内的複雜的資料類型。這些複雜的資料類型需要開發人員花費大量的時間來了解和使用它們。本文将會深入介紹DATE和TIMESTAMP,力圖使讀者對其有一個深入的了解,避免在使用的過程中犯錯。本文将分為以下四個部分:

  1. DATE的定義及其使用的月曆,這也将包括Spark3.0對所使用月曆的變化;
  2. TIMESTAMP的定義及其與時區的聯系,TIMESTAMP如何通過時區偏移來描述一個具體的時間點,以及Java8和Spark3.0中所使用的新的時間API的變化;
  3. Spark中如何通過API來建構DATE和TIMESTAMP值;
  4. Spark driver收集DATE和TIMESTAMP對象的最佳實踐和常見誤區。

DATE和月曆

對于DATE的定義非常簡單,Date是由年,月,日組合而成的一個字段,比如2012年12月31日。但是,年,月,日有各自的取值範圍,比如,月的取值範圍必須是從1到12,日的取值範圍必須根據年和月的不同可以取值為1到28/29/30/31。是以Date值代表的是真實存在的一天。

年、月、日的限制條件和取值範圍可能是由許多不同的月曆定義的。這些月曆有不同的應用場景,有些月曆隻在特定的地區使用,比如農曆;有些月曆隻在曆史上使用,比如儒略曆;而國際上和人們日常生活中常用的标準是公曆。公曆誕生于1582年,後來其紀年日期也被擴充到1582年之前。這種擴充的月曆也被稱作Proleptic Gregorian公曆。

Proleptic Gregorian月曆目前已經被pandas,R,Apache Arrow等多個資料處理架構所使用, Spark從3.0版本開始使用Proleptic Gregorian公曆。在3.0之前的版本中,Spark同時使用了儒略曆和普通公曆,對于1582年之前的日期使用儒略曆,對于1582年之後的日期使用公曆。Spark對于月曆的這種使用方式是調用Java 8 之前版本中java.sql.Date API造成的,在Java8及其之後的版本中java.time.LocalDate API廢棄了原先使用兩種月曆的模式,轉而使用Proleptic Gregorian公曆。

當然,DATE類型并不與時區相關。

TIMESTAMP和時區

TIMESTAMP采用新的字段擴充了DATE類型:小時,分鐘,秒(可能擁有小數部分)以及一個session範圍内的時區。TIMESTAMP定義了地球上一個具體的時間點。比如,2012年12月31日23時59分59.123456秒,session時區是UTC+01:00。當将TimeStamp值寫入非文本資料源(如Parquet)時,這些值隻是沒有時區資訊的點(如UTC中的TimeStamp)。如果使用不同的session時區寫入和讀取TimeStamp值,可能會看到不同的小時/分鐘/秒字段值,但它們實際上對應的是相同的時間點。

當然,小時、分鐘和秒也有着各自的取值範圍,小時是0-23,分鐘和秒是0-59,Spark支援最高到微秒的精度,而微秒的有效範圍是0到999,999微秒。

在任何時間點,我們都可以根據所在時區的不同,觀察到時鐘的不同顯示值。同樣地,一個時鐘的顯示值也可以根據所在時區的不同代表許多不同的時間點。時區偏移允許明确地将目前的TimeStamp值綁定到一個具體的時間點。時區偏移被定義為格林尼治标準時間(GMT)或協調世界時(UTC+0)的小時偏移。這樣的時區資訊表示消除了歧義,但對普通使用者來說卻是比較不友善。使用者更喜歡指出全球的某個具體位置,譬如美國/洛杉矶或歐洲/巴黎等。

如果用具體位置來代替具體的時區偏移資訊并與TimeStamp進行綁定的話,将會帶來一些額外的問題。譬如,我們必須維護一個特殊的時區資料庫,以将時區名稱映射到具體的偏移量。 由于Spark是運作在JVM上的,是以它将時區到具體偏移量的映射委托給了Java标準庫,該庫從Internet配置設定号碼授權機構的時區資料庫(IANA TZDB)加載資料。 此外,Java标準庫中的映射機制在某些方面會影響Spark的行為。 我們在下面重點介紹其中存在的一些問題。

從Java 8開始,JDK公布了用于操作DATE和TIMESTAMP的新API,Spark從3.0版本也遷移到這些的API中。盡管Java 8和Java7對于時區名稱到偏移量的映射使用了相同的源資料庫IANA TZDB,但是二者實作的方式還是有所不同的。

舉例來說,讓我們看一下1883年之前的美國/洛杉矶時區的一個TimeStamp:1883-11-10 00:00:00。 這個時間點之是以與衆不同是因為在1883年11月18日當天,所有北美鐵路都切換到了一個新的标準時間系統來管理其時間表。使用Java 7的時間API,我們可以獲得本地TimeStamp為-08:00的時區偏移量:

scala> java.time.ZoneId.systemDefault
res0: java.time.ZoneId = America/Los_Angeles
scala> java.sql.Timestamp.valueOf("1883-11-10 00:00:00").getTimezoneOffset / 60.0
res1: Double = 8.0
           

Java 8 API卻傳回了不同的結果:

scala> java.time.ZoneId.of("America/Los_Angeles")
.getRules.getOffset(java.time.LocalDateTime.parse("1883-11-10T00:00:00"))
res2: java.time.ZoneOffset = -07:52:58
           

在1883年11月18日之前,時間并不是全球統一的,大多數城鎮都各自使用某種形式的本地統一時間,該時間由著名的時鐘維護(例如,在教堂的尖頂上或在珠寶商的窗戶中) 。 這就是為什麼我們會看到如此奇怪的時區偏移。

該示例說明了Java 8時間函數更加精确,并考慮了IANA TZDB的曆史資料。 切換到Java 8時間API後,Spark 3.0也從這一改進中受益,并在解決時區偏移方面變得更加精确。

正如我們前面提到的,Spark 3.0也将DATE切換為Proleptic Gregorian月曆。TIMESTAMP也是如此。Spark 3.0完全符合 ISO SQL:2016提出的标準,TIMESTAMP的有效範圍在0001-01-01 00:00:00到9999-12-31 23:59:59.999999之間。并支援該範圍内的所有TimeStamp。 而Spark 2.4和更早版本卻存在以下幾個問題:

  1. 0001-01-01 00:00:00至1582-10-03 23:59:59.999999這一範圍内的時間, Spark 2.4使用儒略曆,不符合ISO SQL:2016的标準。 Spark 3.0則使用了Proleptic Gregorian公曆來擷取年,月,日等資訊。由于采用月曆不同,Spark 2.4中存在某些Spark3.0中不存在的日期,譬如,1000-02-29不是有效日期,因為公曆中的1000年不是一個閏年。 同時,Spark 2.4在這TimeStamp範圍内也将時區名稱解析到了錯誤的時區偏移上。
  2. 1582-10-04 00:00:00至 1582-10-14 23:59:59.999999 這一範圍内的時間,是Spark 3.0中有效的本地TimeStamp範圍, Spark 2.4則不存在這樣的TimeStamp。
  3. 1582-10-15 00:00:00至1899-12-31 23:59:59.999999 這一範圍内的時間, Spark 3.0使用IANA TZDB中的曆史資料可以正确解析時區偏移。Spark 2.4則如我們在上面的示例中所描述的,在某些情況下可能無法正确解析時區名稱所對應的時區偏移。
  4. 1900-01-01 00:00:00至2036-12-31 23:59:59.999999 這一範圍内的時間, Spark 3.0和Spark 2.4均符合ANSI SQL标準,并在日期/時間操作(例如擷取某個給定月份中的某天)中使用公曆。
  5. 2037-01-01 00:00:00至9999-12-31 23:59:59.999999 這一範圍内的時間,由于JDK的bug #8073446,Spark 2.4可以解析時區偏移,但是會在某些特定夏令時出現錯誤解析的情況。 Spark 3.0則不受此bug的影響。

将時區名稱映射到偏移量帶來的另一方面問題是夏令時(DST)的應用或切換到另一個标準時區偏移量而導緻的本地TimeStamp的重疊。例如,2019年11月3日02:00:00,時鐘向後調了1小時到01:00:00。2019-11-03 01:30:00 美國 / 洛杉矶對應的時間點可以映射到2019-11-03 01:30:00 UTC-08:00或2019-11-03 01:30:00 UTC-07:00。在Spark3.0中,本地TimeStamp将時區名稱映射到偏移量的時候,切換到夏時制可能會導緻本地TimeStamp重疊的情況。如果可能的話,建議在建構TimeStamp時指定确切的時區偏移量。如果未指定偏移量,而隻是設定時區名稱(例如'2019-11-03 01:30:00 美國 / 洛杉矶'),Spark 3.0将采用較早的偏移量,通常對應于“夏季”。Spark 2.4則有所不同,其采用“冬季”偏移。是以,在時鐘向前跳的缺口時間範圍内将沒有有效的偏移量。從上面的示例可以看出,時區名稱到時區偏移量的映射是不明确的,并且不是一對一的。 是以再次強調,在建構TimeStamp時指定确切的時區偏移量,例如TimeStamp '2019-11-03 01:30:00 UTC-07:00'。

接下來我們将讨論時區的偏移量映射問題,ANSI SQL标準定義了以下兩種TimeStamp:

1.不帶時區的TimeStamp:其包含内容為年,月,日,小時,分鐘,秒。 這些時間機關不與任何時區綁定,實際上就是我們常看到牆上挂着的時鐘時間點。

2.帶有時區的TimeStamp:其包含内容為年,月,日,小時,分鐘,秒,時區偏移小時,時區偏移分鐘。 帶有時區的TimeStamp表示了由UTC時區中的時刻與目前時區偏移量根據小時和分鐘分别進行計算組合而成的時間值。

帶有時區的TimeStamp中包含的時區偏移量不會影響TimeStamp本身所表示的實體時間點。 相反,時區偏移量會影響需要顯示或者列印TimeStamp時候的一些操作,比如,日期/時間提取(例如EXTRACT)以及其它與時區相關的操作(例如向TimeStamp添加月份)。

Spark SQL将TimeStamp類型定義為帶session時區的TimeStamp,這是由字段年,月,日,小時,分鐘,秒,session時區的組合,其中年到秒這部分字段辨別了UTC時間的某一時刻。 session時區則是從Spark SQL配置中spark.sql.session.timeZone參數值擷取。 其中session時區可以進行如下設定:

1.時區偏移量的形式為 '(+|-) HH:mm', 這種形式能使我們能夠準确地定義一個實體時間點。

2.時區名稱也稱之為時區ID,其具體表現形式為“區域/城市”,例如“ 美國/洛杉矶”。 但是,這種形式的時區資訊會使我們遇到上面描述的一些問題,例如本地TimeStamp的重疊。對于任何時區ID,每個UTC時刻都明确地與一個時區偏移量相關聯,是以,每個具有時區ID的TimeStamp都可以明确地轉換為具有區域偏移量的TimeStamp。

預設情況下,session時區會設定為Java虛拟機的預設時區。

Spark中帶有session時區的TimeStamp與以下TimeStamp有所不同:

1.不帶時區的TimeStamp,因為不帶時區的TimeStamp值可以映射到多個實體時刻,但是帶有session時區的TimeStamp的任何值都是具體的實體時刻。是以,寫Spark SQL的時候可以通過在所有會話中使用一個固定的時區偏移來進行統一,例如UTC+0。 在這種情況下,我們可以将UTC的TimeStamp視為本地TimeStamp。

2.帶有時區的TimeStamp,因為根據SQL标準,該類型的列值可以具有不同的時區偏移量。 Spark SQL不支援該功能。

我們應該注意到,與session時區關聯的TimeStamp并不是Spark SQL的新發明。 Oracle等關系型資料庫也為TimeStamp提供了類似的類型:目前時區TimeStamp。

建構Date和TimeStamp

Spark SQL提供了一些構造Date和TimeStamp值的方法:

  1. 不帶參數的預設構造函數:
CURRENT_TIMESTAMP()和CURRENT_DATE()           
  1. 來自其他原始Spark SQL類型,例如INT,LONG和STRING;
  2. 來自外部類型,例如Python datetime或Java類java.time.LocalDate / Instant;

4.從資料源CSV,JSON,Avro,Parquet,ORC或其他類型中反序列化而來。

Spark 3.0中引入的函數MAKE_DATE具有三個參數:年,月,日。這三個參數共同構成了Date值。Spark盡可能将所有輸入的參數隐式轉換為INT類型。 該函數檢查結果Date值在Proleptic Gregorian公曆中是否是有效的,如果不是則傳回NULL。 例如在PySpark中:

>>> spark.createDataFrame([(2020, 6, 26), (1000, 2, 29), (-44, 1, 1)],
... ['Y', 'M', 'D']).createTempView('YMD')
>>> df = sql('select make_date(Y, M, D) as date from YMD')
>>> df.printSchema()
root
 |-- date: date (nullable = true)           

為了列印DataFrame的内容,可以調用Spark 中的show() 算子,在executor端将Date轉為字元串,并将字元串發送到driver端并在控制台上輸出:

>>> df.show()
+-----------+
|       date|
+-----------+
| 2020-06-26|
|       null|
|-0044-01-01|
+-----------+
           

同樣,可以通過MAKE_TIMESTAMP函數設定TimeStamp值。 與MAKE_DATE函數一樣,MAKE_TIMESTAMP對Date字段執行相同的驗證,并對時間字段進行額外的驗證,小時的允許範圍為0-23,分鐘的允許範圍為0-59,秒的允許範圍為0-60。其中,秒的類型為小數,其精度為8,包含6位小數位。由于秒可以進一步細化,是以小數部分的精度最高為微秒。 例如在PySpark中:

>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30.123456),
... (1582, 10, 10, 0, 1, 2.0001), (2019, 2, 29, 9, 29, 1.0)],
... ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND'])
>>> df.show()
+----+-----+---+----+------+---------+
|YEAR|MONTH|DAY|HOUR|MINUTE|   SECOND|
+----+-----+---+----+------+---------+
|2020|    6| 28|  10|    31|30.123456|
|1582|   10| 10|   0|     1|   2.0001|
|2019|    2| 29|   9|    29|      1.0|
+----+-----+---+----+------+---------+

>>> ts = df.selectExpr("make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND) as MAKE_TIMESTAMP")
>>> ts.printSchema()
root
 |-- MAKE_TIMESTAMP: timestamp (nullable = true)   
           

就像處理Date一樣,讓我們使用show()操作列印ts DataFrame的内容。 以類似的方式,show()将TimeStamp轉換為字元串,但是現在它考慮了由Spark SQL配置spark.sql.session.timeZone定義的session時區參數。 在以下示例中,我們将看到這一點。

>>> ts.show(truncate=False)
+--------------------------+
|MAKE_TIMESTAMP            |
+--------------------------+
|2020-06-28 10:31:30.123456|
|1582-10-10 00:01:02.0001  |
|null                      |
           

Spark無法建立最後一個TimeStamp,因為2019年不是一個閏年,該日期是無效的。

此時你可能會注意到,上面的示例中我們沒有提供任何時區資訊。 在這種情況下,Spark從spark.sql.session.timeZone中擷取時區值,并将其應用于函數調用。當然,也可以通過将其他時區作為MAKE_TIMESTAMP的最後一個參數傳遞來選擇其他時區。以下是PySpark中的示例:

>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30, 'UTC'),
...     (1582, 10, 10, 0, 1, 2, 'America/Los_Angeles'),
...     (2019, 2, 28, 9, 29, 1, 'Europe/Moscow')],
...     ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'TZ'])
>>> df = df.selectExpr('make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as MAKE_TIMESTAMP')
>>> df = df.selectExpr("date_format(MAKE_TIMESTAMP, 'yyyy-MM-dd HH:mm:SS VV') AS TIMESTAMP_STRING")
>>> df.show(truncate=False)
+---------------------------------+
|TIMESTAMP_STRING                 |
+---------------------------------+
|2020-06-28 13:31:00 Europe/Moscow|
|1582-10-10 10:24:00 Europe/Moscow|
|2019-02-28 09:29:00 Europe/Moscow|
+---------------------------------+

           

如上例所示,Spark考慮了指定時區的問題,但是會将所有本地TimeStamp都調整為session時區。 是以,傳遞給MAKE_TIMESTAMP函數的原始時區資訊将丢失,因為帶有session時區的TimeStamp類型假定了所有值都屬于一個時區,甚至不為每個值存儲一個時區資訊。根據帶有session時區的TimeStamp的定義,Spark在UTC時區中存儲本地TimeStamp,并在提取日期/時間字段或将TimeStamp轉換為字元串的時候使用session時區。

同樣,可以使用LONG類型構造TimeStamp。 如果LONG類型值是自1970年1月1日00:00:00Z以來的秒數,則可以将其強制轉換為Spark SQL中的TimeStamp:

spark-sql> select CAST(-123456789 AS TIMESTAMP);
1966-02-02 05:26:51           

不幸的是,目前這種方法不允許我們指定秒的小數部分。 未來,Spark SQL将提供一些特定的函數通過自1970年1月1日00:00:00Z以來的秒,毫秒和微秒來建立

TimeStamp:timestamp_seconds(),timestamp_millis()和timestamp_micros()。           

另一種方法是通過STRING類型值來構造Date和TimeStamp。 我們可以使用特殊的關鍵字來進行轉化:

spark-sql> select timestamp '2020-06-28 22:17:33.123456 Europe/Amsterdam', date '2020-07-01';
2020-06-28 23:17:33.123456    2020-07-01           

或通過強制轉換,可以将轉換應用到列中的所有值:

spark-sql> select cast('2020-06-28 22:17:33.123456 Europe/Amsterdam' as timestamp), cast('2020-07-01' as date);
2020-06-28 23:17:33.123456    2020-07-01           

如果在輸入字元串中省略了時區資訊,Spark則将輸入TimeStamp字元串解釋為指定時區或session時區中的本地TimeStamp。此外,可以使用to_timestamp()函數将具有特定模式的字元串轉換為TimeStamp。 在Datetime Patterns for Formatting and Parsing一文中描述了支援進行轉換的比對模式:

spark-sql> select to_timestamp('28/6/2020 22.17.33', 'dd/M/yyyy HH.mm.ss');
2020-06-28 22:17:33           

如果未指定任何比對模式,則to_timestamp()函數的行為類似于強制轉換。

為了提高可用性,Spark SQL可以在上述所有接受字元串并傳回TimeStamp和Date的方法中識别特殊字元串值:

  • epoch是Date ”1970-01-01”或TimeStamp ” 1970-01-01 00:00:00Z”的别名;
  • now是session時區的目前Date或TimeStamp。 在單個查詢中,它總是産生相同的結果;
  • today是TIMESTAMP類型的目前日的開始,或者是DATE類型的目前日的開始;
  • tomorrow是TIMESTAMP類型的第二天的開始,或者是DATE類型的第二天的開始。
  • yesterday是TIMESTAMP類型的目前日的前一天的開始。

例如,

spark-sql> select timestamp 'yesterday', timestamp 'today', timestamp 'now', timestamp 'tomorrow';
2020-06-27 00:00:00    2020-06-28 00:00:00    2020-06-28 23:07:07.18    2020-06-29 00:00:00
spark-sql> select date 'yesterday', date 'today', date 'now', date 'tomorrow';
2020-06-27    2020-06-28    2020-06-28    2020-06-29           

Spark有一個強大的功能是可以從driver端的存在的外部對象集合中構造資料集,并建立相應類型的列。 Spark将外部類型的相關資料轉換為語義上等效的内部表示形式。 PySpark允許從Python集合中的DATE和TIMESTAMP列建立資料集,例如:

>>> import datetime
>>> df = spark.createDataFrame([(datetime.datetime(2020, 7, 1, 0, 0, 0),
...     datetime.date(2020, 7, 1))], ['timestamp', 'date'])
>>> df.show()
+-------------------+----------+
|          timestamp|      date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+           

PySpark使用系統時區在driver端将Python的datetime對象轉換為内部Spark SQL表示形式,該時區可能與Spark中spark.sql.session.timeZone設定的session時區不同。Spark SQL内部值不包含任何有關原始時區的資訊。 根據帶session時區的TimeStamp的定義,接下來并行化對Date和TimeStamp的操作的時候将僅考慮Spark SQL中的session時區。

與我們上面針對Python集合示範的方式類似,Spark在Java / Scala API中将以下類型識别為外部date-time類型:

  • java.sql.Date和java.time.LocalDate作為Spark SQL的DATE類型的外部類型;
  • java.sql.Timestamp和java.time.Instant作為Spark SQL的TIMESTAMP類型的外部類型。

java.sql. 和java.time. 之間是有所差別的。Java 8中添加了基于Proleptic Gregorian月曆的java.time.LocalDate類和java.time.Instant類,Proleptic Gregorian月曆同時也應用于Spark3.0中。java.sql.Date和java.sql.Timestamp類卻從1582年10月15日以來,引用了儒略曆和公曆混合的模式,這種方式同時與Spark3.0之前的版本使用的月曆方式相同。由于月曆系統的不同,Spark必須在轉換為内部Spark SQL的時候執行其他操作,并将輸入Date/TimeStamp值所引用的月曆進行轉換。對于1900年以後的TimeStamp而言,重置操作的開銷很小,但對于1900年以前的TimeStamp卻又不小的負擔。

下面的示例顯示了如何使用Scala集合制作TimeStamp。 在第一個示例中,說明了如何從字元串構造一個java.sql.Timestamp對象。 valueOf()函數将輸入字元串解釋為預設JVM時區中的本地TimeStamp,該時區可能與Spark的session時區不同。如果需要在特定時區構造java.sql.Timestamp或java.sql.Date的對象,建議使用java.text.SimpleDateFormat類(及其setTimeZone方法)或java.util.Calendar類。

scala> Seq(java.sql.Timestamp.valueOf("2020-06-29 22:41:30"), new java.sql.Timestamp(0)).toDF("ts").show(false)
+-------------------+
|ts                 |
+-------------------+
|2020-06-29 22:41:30|
|1970-01-01 03:00:00|
+-------------------+
scala> Seq(java.time.Instant.ofEpochSecond(-12219261484L), java.time.Instant.EPOCH).toDF("ts").show
+-------------------+
|                 ts|
+-------------------+
|1582-10-15 11:12:13|
|1970-01-01 03:00:00|
+-------------------+           

同樣,我們可以從java.sql.Date類或java.LocalDate類的對象集合中建立一個日期列。 java.LocalDate對象的轉化完全獨立于Spark的session時區或JVM預設時區,但是java.sql.Date對象卻并非如此:

  1. java.sql.Date對象表示Spark driver端上預設JVM時區的本地日期;
  2. 為了正确轉換Spark SQL值,driver和executor上的預設JVM時區必須相同。
scala> Seq(java.time.LocalDate.of(2020, 2, 29), java.time.LocalDate.now).toDF("date").show
+----------+
|      date|
+----------+
|2020-02-29|
|2020-06-29|
+----------+           

為了避免任何與月曆和時區相關的問題,我們建議在Java / Scala并行化操作TimeStamp或Date的時候使用Java 8中的java.LocalDate/ Instant作為外部資料類型。

收集Date和TimeStamp

并行化執行的逆操作是将executor端的Date和TimeStamp收集回driver端,并傳回一組外部可列印的資料類型。 例如,在上面的示例中,我們可以通過collect()算子将DataFrame拉回到driver端:

>>> df.collect()
[Row(timestamp=datetime.datetime(2020, 7, 1, 0, 0), date=datetime.date(2020, 7, 1))]           

Spark将UTC時區的date和timestamps列的值作為時間點從executor端傳輸到driver端,并在driver端根據系統時區将其轉換為Python datetime對象,而不使用Spark SQL會話時區。 collect()算子與上一節中描述的show()算子不同。 show()在将TimeStamp轉換為字元串的時候使用session時區,并在driver端收集結果字元串。

在Java和Scala API中,Spark預設執行以下轉換:

  1. Spark SQL中的Date值将轉換為java.sql.Date的對象;
  2. Spark SQL中的TimeStamp值轉換為java.sql.Timestamp的對象。

以上兩種轉換均在driver端的預設JVM時區中執行。 這樣,通過Date.getDay(),getHour()等函數以及通過Spark SQL函數DAY,HOUR變量,根據driver端上的預設JVM時區或executor端session時區對于同一時間列進行轉化擷取的結果是相同的。

與使用java.sql.Date/Timestamp類建立Date和TimeStamp類似,Spark 3.0的執行需要從Proleptic Gregorian月曆到混合月曆(儒略曆+公曆)的重新設定。 對于1582年之後Date和1900年之後的TimeStamp而言,此操作幾乎沒有什麼額外的開銷,但是對于1582年之前的Date和1900年之前的TimeStamp則可能會帶來一些開銷。

如果将SQL 配置中spark.sql.datetime.java8API.enabled參數設定為true的話,則可以避免此類與月曆相關的問題,此時Spark傳回自Java 8開始添加的java.time類型。則Dataset的collect()算子将傳回将傳回如下結果:

  • Spark SQL的DATE類型對應的是java.time.LocalDate類;
  • Spark SQL的TIMESTAMP類型對應的是java.time.Instant類。

現在,轉換不會遇到與月曆相關的問題,因為Java 8類型和Spark SQL 3.0都基于Proleptic Gregorian月曆。 collect()算子不再取決于預設的JVM時區,TimeStamp轉換完全不取決于時區。 關于Date的轉換,其使用了SQL 配置中spark.sql.session.timeZone設定的session時區。 例如,讓我們看一個具有DATE和TIMESTAMP列的資料集,将預設JVM時區設定為歐洲/莫斯科,将會話時區設定為美國/洛杉矶。

scala> java.util.TimeZone.getDefault
res1: java.util.TimeZone = sun.util.calendar.ZoneInfo[id="Europe/Moscow",...]

scala> spark.conf.get("spark.sql.session.timeZone")
res2: String = America/Los_Angeles

scala> df.show
+-------------------+----------+
|          timestamp|      date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+           

show()算子會列印帶有session時區美國/洛杉矶的TimeStamp,但是如果我們需要收集資料集,它将被轉換為java.sql.Timestamp對象并通過toString方法顯示具有歐洲/莫斯科的字元串:

scala> df.collect()
res16: Array[org.apache.spark.sql.Row] = Array([2020-07-01 10:00:00.0,2020-07-01])

scala> df.collect()(0).getAs[java.sql.Timestamp](0).toString
res18: java.sql.Timestamp = 2020-07-01 10:00:00.0           

實際上,本地TimeStamp ‘2020-07-01 00:00:00’對應的是UTC的2020-07-01T07:00:00Z。可以觀察到,如果啟用Java 8 API并收集資料集将是如下的結果:

scala> df.collect()
res27: Array[org.apache.spark.sql.Row] = Array([2020-07-01T07:00:00Z,2020-07-01])           

java.time.Instant對象将被轉換為任何本地TimeStamp而不依賴于任何JVM時區資訊。這是java.time.Instant相對于java.sql.Timestamp的優點之一。

java.sql.Timestamp要求更改全局JVM設定,這會影響同一JVM上的其他TimeStamp。是以,如果需要應用程式在不同時區中處理Date或TimeStamp,并且通過Java / Scala Dataset.collect() API将資料收集到driver端的時候,應用程式之間不會發生沖突的話,在Spark SQL中配置spark.sql.datetime.java8API.enabled,切換到Java 8 API 的配置。

總結

在此部落格文章中,我們描述了Spark SQL DATE和TIMESTAMP類型,展示了如何從其它原始Spark SQL類型和外部Java類型構造Date和TimeStamp列,以及如何将Date和TimeStamp列作為外部Java類型收集回Spark driver端。從Spark 3.0版本開始,Spark從結合了儒略曆和普通曆的混合月曆切換到了Proleptic Gregorian公曆(更多資訊請參閱SPARK-26651)。如我們先前示範的,這使Spark消除了許多問題。 為了讓Spark3.0以前的版本向後相容,Spark仍可以通過調用collect算子顯式指定java.sql.Date和java.sql.Timestamp傳回混合月曆中的Date和TimeStamp。為避免使用Java / Scala的collect操作時出現月曆和時區解析問題,可以通過SQL配置spark.sql.datetime.java8API.enabled來啟用Java 8 API。Spark3.0作為Databricks Runtime 7.0的一部分,目前可以在Databricks平台上進行免費試用。

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

Spark 3.0 對于 DATE 和 TIMESTAMP 的改進