Прежде всего, я хочу сообщить вам, что я все еще очень новичок в искусстве и привык к концепции ленивых вычислений.
Вот мой выпуск:
У меня есть две искрыФреймы данных, которые я загружаю при чтении файлов CSV.GZ.Я пытаюсь объединить обе таблицы, чтобы разделить первую таблицу по ключам, которые у меня есть на второй.
Например:
Таблица A
+----------+---------+--------+---------+------+
| Date| Zone| X| Type|Volume|
+----------+---------+--------+---------+------+
|2019-01-16|010010000| B| A| 684|
|2019-01-16|010020000| B| A| 21771|
|2019-01-16|010030000| B| A| 7497|
|2019-01-16|010040000| B| A| 74852|
Таблица B
+----+---------+
|Dept| Zone|
+----+---------+
| 01|010010000|
| 02|010020000|
| 01|010030000|
| 02|010040000|
Затем, когда я объединяю обе таблицы, у меня есть:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010020000|2019-01-16| B| A| 21771| 02|
|010030000|2019-01-16| B| A| 7497| 01|
|010040000|2019-01-16| B| A| 74852| 02|
Итак, я хочу разделить эту таблицу на Y разделенных таблиц, гдеY - это число различных значений 'Dept', которые я нахожу в объединенной таблице.
Так, например:
Результат1:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010030000|2019-01-16| B| A| 7497| 01|
Результат2:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010020000|2019-01-16| B| A| 21771| 02|
|010040000|2019-01-16| B| A| 74852| 02|
Мой код выглядит следующим образом:
sp_df_A = spark.read.csv(file_path_A, header=True, sep=';', encoding='cp1252')
sp_df_B = spark.read.csv(file_path_B, header=True, sep=';', encoding='cp1252')
sp_merged_df = sp_df_A.join(sp_df_B, on=['Zone'], how='left')
# list of unique 'Dept' values on the merged DataFrame
unique_buckets = [x.__getitem__('Dept') for x in sp_merged_df.select('Dept').distinct().collect()]
# Iterate over all 'Dept' found
for zone_bucket in unique_buckets:
print(zone_bucket)
bucket_dir = os.path.join(output_dir, 'Zone_%s' % zone_bucket)
if not os.path.exists(bucket_dir):
os.mkdir(bucket_dir)
# Filter target 'Dept'
tmp_df = sp_merged_df.filter(sp_merged_df['Dept'] == zone_bucket)
# write result
tmp_df.write.format('com.databricks.spark.csv').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save(bucket_dir, header = 'true')
Дело в том, что этот очень простой код занимает слишком много времени, чтобы написать результат.Поэтому я предполагаю, что ленивая оценка загружает, объединяет и фильтрует каждый цикл цикла.
Может ли это быть так?