У меня есть несколько файлов паркета, по одному на каждый датчик, который содержит данные временных рядов. То, что я пытаюсь сделать, это повторно заполнить и переслать. Я знаю, как это сделать, когда есть один датчик, но у меня проблема с эффективным достижением того же самого.
Сейчас я читаю по одному файлу / датчику за раз, повторная выборка, перешлите его вперед, поместите в массив и затем объедините все имеющиеся в массиве dfs, что очень медленно. Мне интересно, могу ли я как-то прочитать все данные сразу, а затем использовать rdd
вместо df
, чтобы обработать каждый датчик как отдельный раздел и присоединиться к ним.
Объединенные данные выглядят так:
+----------+-------------+----------+-------------+
| epoch| value| timestamp| tag|
+----------+-------------+----------+-------------+
|1493571720| 9.546202E-05|1493571725| SA|
|1493571720| 0.02965982|1493571735| SA|
|1493571720| -0.001335071|1493571745| SB|
|1493571960| 0|1493572005| SB|
|1493571960| 100|1493572005| SB|
|1493571960| 0|1493571985| SC|
|1493571960| 100|1493572005| SC|
|1493572680|-0.0003813824|1493572695| SC|
+----------+-------------+----------+-------------+
И вот что я написал до сих пор:
# Read each file one by one, resample, forwardfill and join
resample_unit_sec=60
source_path = 'hdfs://s-mac/user/waqas/parquet/tag={}'
files = ['SA', 'SB', 'SC']
for file in files:
parq_df = spark.read.parquet(source_bucket.format(file))
epoch = (F.col("timestamp").cast("bigint") / resample_unit_sec).cast("bigint") * resample_unit_sec
with_epoch = parq_df.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(F.min("epoch"), F.max("epoch")).first()
ref = spark.range(min_epoch, max_epoch + 1, resample_unit_sec).toDF("epoch")
final = ref.join(with_epoch, "epoch", "left").orderBy("epoch").withColumn("TS_resampled", F.col("epoch").cast("timestamp"))
dfs.append(final.cache())
for each df:
window_ff = Window.orderBy('TS_resampled').rowsBetween(-sys.maxsize, 0)
ffilled_column = F.last(df['value'], ignorenulls=True).over(window_ff)
filled_final = df.withColumn('ffilled', ffilled_column)
filled_final = filled_final.groupBy(['TS_resampled']).pivot('tag').agg({'ffilled':'mean'}).orderBy('TS_resampled')