Мне нужно соединить два спарк-фрейма данных в столбце метки времени.Проблема состоит в том, что они имеют разные частоты: первый кадр данных (df1) имеет наблюдение каждые 10 минут, а второй (df2) составляет 25 Гц (25 наблюдений в секунду, что в 15000 раз чаще, чем df1).Каждый фрейм данных имеет более 100 столбцов и миллионы строк.Чтобы сделать плавное соединение, я пытаюсь изменить частоту df1 до 25 Гц, заполнить фронтом значения Null, вызванные повторной дискретизацией, а затем соединить кадры данных, как только они будут на той же частоте.Фреймы данных слишком большие, поэтому я пытаюсь использовать spark вместо панд.
Итак, вот вопрос: скажем, у меня есть следующий фрейм данных spark:
Я хочу изменить частоту до 25 Гц (25 наблюдений в секунду), чтобы это выглядело так:
Как эффективно это сделать в pyspark?
Примечание:
Я попытался повторно сэмплировать свой df1, используя код из предыдущего вопроса( PySpark: как изменить частоту ), как показано ниже:
from pyspark.sql.functions import col, max as max_, min as min_
freq = x # x is the frequency in seconds
epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq
with_epoch = df1.withColumn("dummy", epoch)
min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()
new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")
new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))
Кажется, что приведенный выше код работает только тогда, когда предполагаемая частота больше или равна секунде.Например, когда freq = 1, он создает следующую таблицу:
Однако, когда я пропускаю 25 Гц в качестве частоты (т.е. freq = 1/ 25) код завершается ошибкой, поскольку «шаг» в функции spark.range не может быть меньше 1.
Есть ли обходной путь для решения этой проблемы?Или каким-либо другим способом пересчитать частоту до миллисекунд?