Pyspark ленивая оценка в петлях слишком медленно - PullRequest
0 голосов
/ 22 января 2019

Прежде всего, я хочу сообщить вам, что я все еще очень новичок в искусстве и привык к концепции ленивых вычислений.

Вот мой выпуск:

У меня есть две искрыФреймы данных, которые я загружаю при чтении файлов CSV.GZ.Я пытаюсь объединить обе таблицы, чтобы разделить первую таблицу по ключам, которые у меня есть на второй.

Например:

Таблица A

+----------+---------+--------+---------+------+
|      Date|     Zone|       X|     Type|Volume|
+----------+---------+--------+---------+------+
|2019-01-16|010010000|       B|        A|   684|
|2019-01-16|010020000|       B|        A| 21771|
|2019-01-16|010030000|       B|        A|  7497|
|2019-01-16|010040000|       B|        A| 74852|

Таблица B

+----+---------+
|Dept|     Zone|
+----+---------+
|  01|010010000|
|  02|010020000|
|  01|010030000|
|  02|010040000|

Затем, когда я объединяю обе таблицы, у меня есть:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16|       B|        A|   684|  01|
|010020000|2019-01-16|       B|        A| 21771|  02|
|010030000|2019-01-16|       B|        A|  7497|  01|
|010040000|2019-01-16|       B|        A| 74852|  02|

Итак, я хочу разделить эту таблицу на Y разделенных таблиц, гдеY - это число различных значений 'Dept', которые я нахожу в объединенной таблице.

Так, например:

Результат1:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16|       B|        A|   684|  01|
|010030000|2019-01-16|       B|        A|  7497|  01|

Результат2:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010020000|2019-01-16|       B|        A| 21771|  02|
|010040000|2019-01-16|       B|        A| 74852|  02|

Мой код выглядит следующим образом:

sp_df_A = spark.read.csv(file_path_A, header=True, sep=';', encoding='cp1252')
sp_df_B = spark.read.csv(file_path_B, header=True, sep=';', encoding='cp1252')

sp_merged_df = sp_df_A.join(sp_df_B, on=['Zone'], how='left')


# list of unique 'Dept' values on the merged DataFrame
unique_buckets = [x.__getitem__('Dept') for x in sp_merged_df.select('Dept').distinct().collect()]


# Iterate over all 'Dept' found
for zone_bucket in unique_buckets:
    print(zone_bucket)
    bucket_dir = os.path.join(output_dir, 'Zone_%s' % zone_bucket)
    if not os.path.exists(bucket_dir):
        os.mkdir(bucket_dir)
    # Filter target 'Dept'
    tmp_df = sp_merged_df.filter(sp_merged_df['Dept'] == zone_bucket)
    # write result
    tmp_df.write.format('com.databricks.spark.csv').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save(bucket_dir, header = 'true')

Дело в том, что этот очень простой код занимает слишком много времени, чтобы написать результат.Поэтому я предполагаю, что ленивая оценка загружает, объединяет и фильтрует каждый цикл цикла.

Может ли это быть так?

1 Ответ

0 голосов
/ 22 января 2019

Ваше предположение верно.Ваш код читает, объединяет и фильтрует все данные для каждого из блоков.Это действительно вызвано ленивой оценкой искры.

Spark ожидает любого преобразования данных, пока не будет выполнено действие.Когда действие вызывается, spark просматривает все преобразования и создает план того, как эффективно получить результаты действия.Пока спарк выполняет этот план, программа держится.Когда искра завершена, программа продолжается, и спарк «забывает» обо всем, что было сделано, до тех пор, пока не будет вызвано следующее действие.

В вашем случае искра «забывает» объединенный фрейм данных sp_merged_df, и каждый раз, когда вызывается .collect() или .save(), он восстанавливает его.

Если вы хотите, чтобы Spark «запоминал» RDD или DataFrame, вы можете .cache() его (см. docs ).

...