PySpark: как добавить фреймы данных в цикл For - PullRequest
1 голос
/ 29 мая 2019

Я выполняю скользящее среднее вычисление для отдельных временных рядов временных рядов, затем хочу объединить / добавить результаты.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame [ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка,syscode: строка, ntwrk_cd: строка, syscode_ntwrk: строка, метрика: double, список: массив, Rolling_median: float], DataFrame [ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка, syscode: строка, ntwrk_cd: строка, sys, метрика: двойная, список: массив, roll_median: float]]

Когда я пытаюсь .show ():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

Я понимаю, что это говорит о том, что объект является список фреймов данных .Как преобразовать в один фрейм данных?

Я знаю, что следующее решение работает для явного числа фреймов данных, но я хочу, чтобы цикл for не зависел от количества фреймов данных:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Есть ли способ обобщить это для неявных имен фреймов данных?

1 Ответ

1 голос
/ 29 мая 2019

Спасибо всем! Подводя итог - решение использует Reduce и unionAll:

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)
...