Работа с микросекундными метками времени в PySpark - PullRequest
0 голосов
/ 17 января 2019

У меня есть фрейм данных pyspark со следующим форматом времени 20190111-08:15:45.275753. Я хочу преобразовать это в формат отметки времени, сохраняя детализацию микросекунды. Тем не менее, кажется, что трудно сохранить микросекунды, поскольку все преобразования времени в pyspark производят секунды?

У вас есть подсказка, как это можно сделать? Обратите внимание, что преобразование его в панды и т. Д. Не будет работать, так как набор данных огромен, поэтому мне нужен эффективный способ сделать это. Пример того, как я делаю это ниже

time_df = spark.createDataFrame([('20150408-01:12:04.275753',)], ['dt'])
res = time_df.withColumn("time",  unix_timestamp(col("dt"), \
format='yyyyMMdd-HH:mm:ss.000').alias("time"))
res.show(5, False)

Ответы [ 2 ]

0 голосов
/ 18 января 2019

Я нашел обходной путь для этого, используя функцию to_utc_timestamp в pyspark, однако не совсем уверен, является ли это наиболее эффективным, хотя, похоже, он отлично работает на примерно 100 млн строк данных. Вы можете избежать regex_replace, если ваша строка метки времени выглядела так - 1997-02-28 10: 30: 40.897748

 from pyspark.sql.functions import regexp_replace, to_utc_timestamp

 df = spark.createDataFrame([('19970228-10:30:40.897748',)], ['new_t'])
 df = df.withColumn('t', regexp_replace('new_t', '^(.{4})(.{2})(.{2})-', '$1-$2-$3 '))
 df = df.withColumn("time", to_utc_timestamp(df.t, "UTC").alias('t'))
 df.show(5,False)
 print(df.dtypes)
0 голосов
/ 17 января 2019

Обычно гранулярность отметки времени указывается в секундах, поэтому я не думаю, что существует прямой метод сохранения гранулярности в миллисекундах.

В pyspark есть функция unix_timestamp, которая:

unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')

Преобразование строки времени с заданным шаблоном ('yyyy-MM-dd HH:mm:ss', по умолчанию) в метку времени Unix ( в секундах ), используя часовой пояс по умолчанию и по умолчанию locale, вернуть ноль, если не получится.

if `timestamp` is None, then it returns current timestamp.

>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> time_df = spark.createDataFrame([('2015-04-08',)], ['dt'])
>>> time_df.select(unix_timestamp('dt', 'yyyy-MM-dd').alias('unix_time')).collect()
[Row(unix_time=1428476400)]
>>> spark.conf.unset("spark.sql.session.timeZone")

Пример использования:

import pyspark.sql.functions as F
res = df.withColumn(colName,  F.unix_timestamp(F.col(colName), \
    format='yyyy-MM-dd HH:mm:ss.000').alias(colName) )

Что вы можете сделать, это разделить строку даты (str.rsplit('.', 1)), разделяя миллисекунды (например, путем создания другого столбца) в вашем фрейме данных.

EDIT

В вашем примере проблема в том, что время имеет тип string. Сначала вам нужно преобразовать его в тип timestamp: это можно сделать с помощью:

res = time_df.withColumn("new_col", to_timestamp("dt", "yyyyMMdd-hh:mm:ss"))

Тогда вы можете использовать unix_timestap

res2 = res.withColumn("time",  F.unix_timestamp(F.col("parsed"), format='yyyyMMdd-hh:mm:ss.000').alias("time"))

Наконец, чтобы создать столбцы с миллисекундами:

res3 = res2.withColumn("ms", F.split(res2['dt'], '[.]').getItem(1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...