Pyspark: повторная выборка частот до миллисекунд - PullRequest
0 голосов
/ 04 июня 2019

Мне нужно соединить два спарк-фрейма данных в столбце метки времени.Проблема состоит в том, что они имеют разные частоты: первый кадр данных (df1) имеет наблюдение каждые 10 минут, а второй (df2) составляет 25 Гц (25 наблюдений в секунду, что в 15000 раз чаще, чем df1).Каждый фрейм данных имеет более 100 столбцов и миллионы строк.Чтобы сделать плавное соединение, я пытаюсь изменить частоту df1 до 25 Гц, заполнить фронтом значения Null, вызванные повторной дискретизацией, а затем соединить кадры данных, как только они будут на той же частоте.Фреймы данных слишком большие, поэтому я пытаюсь использовать spark вместо панд.

Итак, вот вопрос: скажем, у меня есть следующий фрейм данных spark:

sample_df

Я хочу изменить частоту до 25 Гц (25 наблюдений в секунду), чтобы это выглядело так:

Expected_result

Как эффективно это сделать в pyspark?

Примечание:

Я попытался повторно сэмплировать свой df1, используя код из предыдущего вопроса( PySpark: как изменить частоту ), как показано ниже:

from pyspark.sql.functions import col, max as max_, min as min_

freq = x   # x is the frequency in seconds

epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq 

with_epoch  = df1.withColumn("dummy", epoch)

min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()

new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")

new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))

Кажется, что приведенный выше код работает только тогда, когда предполагаемая частота больше или равна секунде.Например, когда freq = 1, он создает следующую таблицу:

undesired_result

Однако, когда я пропускаю 25 Гц в качестве частоты (т.е. freq = 1/ 25) код завершается ошибкой, поскольку «шаг» в функции spark.range не может быть меньше 1.

Есть ли обходной путь для решения этой проблемы?Или каким-либо другим способом пересчитать частоту до миллисекунд?

1 Ответ

1 голос
/ 04 июня 2019

Если ваша цель - объединить 2 кадра данных, я бы предложил использовать внутреннее объединение напрямую:

df = df1.join(df2, df1.Timestamp == df2.Timestamp)

Однако, если вы хотите уменьшить частоту дискретизации, вы можете преобразовать отметку времени в миллисекунды и сохранить те строки, которые mod(timestamp, 25) == 0. Вы можете использовать это, только если вы уверены, что данные отбираются идеально.

from pyspark.sql.functions import col
df1 = df1.filter( ((col("Timestamp") % 25) == 0 )

Другой вариант - нумеровать каждую строку и сохранять 1 каждые 25. С помощью этого решения вы будете сокращать строки без учета метки времени. Другая проблема этого решения заключается в том, что вам нужно сортировать данные (не эффективно).

PD: преждевременная оптимизация - корень всего зла

Редактировать: метка времени в int

Давайте создадим поддельный набор данных, полный меток времени, используя стандарт эпохи с миллисекундами.

>>>  df = sqlContext.range(1559646513000, 1559646520000)\
                    .select( (F.col('id')/1000).cast('timestamp').alias('timestamp'))
>>> df
DataFrame[timestamp: timestamp]
>>> df.show(5,False)
+-----------------------+
|timestamp              |
+-----------------------+
|2019-06-04 13:08:33    |
|2019-06-04 13:08:33.001|
|2019-06-04 13:08:33.002|
|2019-06-04 13:08:33.003|
|2019-06-04 13:08:33.004|
+-----------------------+
only showing top 5 rows

Теперь конвертируйте обратно в целые числа:

>>> df.select( (df.timestamp.cast('double')*1000).cast('bigint').alias('epoch') )\
      .show(5, False)
+-------------+
|epoch        |
+-------------+
|1559646513000|
|1559646513001|
|1559646513002|
|1559646513003|
|1559646513004|
+-------------+
only showing top 5 rows
...