Почему функция агрегирования pyspark.sql.functions.collect_list () добавляет локальное смещение часового пояса на дисплей? - PullRequest
0 голосов
/ 17 июня 2019

Я запускаю следующий код в сеансе оболочки pyspark. Запуск collect_list () после groupBy изменяет способ отображения меток времени (добавлено смещение UTC + 02: 00, возможно, потому, что это локальное смещение в Греции, где выполняется код). Хотя дисплей проблематичен, временная метка под капотом остается неизменной. Это можно наблюдать либо путем добавления столбца с фактическими временными метками Unix, либо путем возврата кадра данных к его первоначальной форме с помощью pyspark.sql.functions.explode (). Это ошибка?

import datetime
import os
from pyspark.sql import functions, types, udf

# configure utc timezone
spark.conf.set("spark.sql.session.timeZone", "UTC")
os.environ['TZ']
time.tzset()

# create DataFrame
date_time = datetime.datetime(year = 2019, month=1, day=1, hour=12)
data = [(1, date_time), (1, date_time)]
schema = types.StructType([types.StructField("id", types.IntegerType(), False), types.StructField("time", types.TimestampType(), False)])
df_test = spark.createDataFrame(data, schema)

df_test.show()
+---+-------------------+
| id|               time|
+---+-------------------+
|  1|2019-01-01 12:00:00|
|  1|2019-01-01 12:00:00|
+---+-------------------+

# GroupBy and collect_list
df_test1 = df_test.groupBy("id").agg(functions.collect_list("time"))
df_test1.show(1, False)
+---+----------------------------------------------+
|id |collect_list(time)                            |
+---+----------------------------------------------+
|1  |[2019-01-01 14:00:00.0, 2019-01-01 14:00:00.0]|
+---+----------------------------------------------+

# add column with unix timestamps
to_timestamp = functions.udf(lambda x : [value.timestamp() for value in x], types.ArrayType(types.FloatType()))
df_test1.withColumn("unix_timestamp",to_timestamp(functions.col("collect_list(time)")))
df_test1.show(1, False)
+---+----------------------------------------------+----------------------------+
|id |collect_list(time)                            |unix_timestamp              |
+---+----------------------------------------------+----------------------------+
|1  |[2019-01-01 14:00:00.0, 2019-01-01 14:00:00.0]|[1.54634394E9, 1.54634394E9]|
+---+----------------------------------------------+----------------------------+

# explode list to distinct rows
df_test1.groupBy("id").agg(functions.collect_list("time")).withColumn("test", functions.explode(functions.col("collect_list(time)"))).show(2, False)
+---+----------------------------------------------+-------------------+
|id |collect_list(time)                            |test               |
+---+----------------------------------------------+-------------------+
|1  |[2019-01-01 14:00:00.0, 2019-01-01 14:00:00.0]|2019-01-01 12:00:00|
|1  |[2019-01-01 14:00:00.0, 2019-01-01 14:00:00.0]|2019-01-01 12:00:00|
+---+----------------------------------------------+-------------------+


пс. 1.54634394E9 == 2019-01-01 12:00:00, что является правильной отметкой времени UTC

1 Ответ

0 голосов
/ 17 июня 2019

Для меня приведенный выше код работает, но не конвертирует время, как в вашем случае. Возможно, проверьте часовой пояс вашего сеанса (и, необязательно, установите для него значение tz):

spark.conf.get('spark.sql.session.timeZone')

В общем случае TimestampType в pyspark не поддерживает tz, как в Pandas, а передает длинные целые и отображает их в соответствии с часовым поясом вашего компьютера (по умолчанию).

...