Повторная выборка и прямое заполнение данных несколькими тегами - PullRequest
0 голосов
/ 25 октября 2019

У меня есть несколько файлов паркета, по одному на каждый датчик, который содержит данные временных рядов. То, что я пытаюсь сделать, это повторно заполнить и переслать. Я знаю, как это сделать, когда есть один датчик, но у меня проблема с эффективным достижением того же самого.

Сейчас я читаю по одному файлу / датчику за раз, повторная выборка, перешлите его вперед, поместите в массив и затем объедините все имеющиеся в массиве dfs, что очень медленно. Мне интересно, могу ли я как-то прочитать все данные сразу, а затем использовать rdd вместо df, чтобы обработать каждый датчик как отдельный раздел и присоединиться к ним.

Объединенные данные выглядят так:

+----------+-------------+----------+-------------+
|     epoch|        value| timestamp|          tag|
+----------+-------------+----------+-------------+
|1493571720| 9.546202E-05|1493571725|           SA|
|1493571720|   0.02965982|1493571735|           SA|
|1493571720| -0.001335071|1493571745|           SB|
|1493571960|            0|1493572005|           SB|
|1493571960|          100|1493572005|           SB|
|1493571960|            0|1493571985|           SC|
|1493571960|          100|1493572005|           SC|
|1493572680|-0.0003813824|1493572695|           SC|
+----------+-------------+----------+-------------+

И вот что я написал до сих пор:

# Read each file one by one, resample, forwardfill and join
resample_unit_sec=60
source_path = 'hdfs://s-mac/user/waqas/parquet/tag={}'
files = ['SA', 'SB', 'SC']

for file in files:
    parq_df = spark.read.parquet(source_bucket.format(file))
    epoch = (F.col("timestamp").cast("bigint") / resample_unit_sec).cast("bigint") * resample_unit_sec
    with_epoch = parq_df.withColumn("epoch", epoch)
    min_epoch, max_epoch = with_epoch.select(F.min("epoch"), F.max("epoch")).first()
    ref = spark.range(min_epoch, max_epoch + 1, resample_unit_sec).toDF("epoch")
    final = ref.join(with_epoch, "epoch", "left").orderBy("epoch").withColumn("TS_resampled", F.col("epoch").cast("timestamp"))
    dfs.append(final.cache())

for each df:
    window_ff = Window.orderBy('TS_resampled').rowsBetween(-sys.maxsize, 0)
    ffilled_column = F.last(df['value'], ignorenulls=True).over(window_ff)
    filled_final = df.withColumn('ffilled', ffilled_column)
    filled_final = filled_final.groupBy(['TS_resampled']).pivot('tag').agg({'ffilled':'mean'}).orderBy('TS_resampled')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...