Я пытаюсь уменьшить большой набор данных до строк, имеющих минимальные и максимальные значения для каждого столбца. Другими словами, я хотел бы, чтобы для каждого столбца этого набора данных была получена одна строка с минимальным значением в этом столбце, а также другая строка с максимальным значением в том же столбце. Я должен отметить, что я не знаю заранее, какие столбцы будут иметь этот набор данных. Вот пример:
+----+----+----+ +----+----+----+
|Col1|Col2|Col3| ==> |Col1|Col2|Col3|
+----+----+----+ +----+----+----+
| F | 99 | 17 | | A | 34 | 25 |
| M | 32 | 20 | | Z | 51 | 49 |
| D | 2 | 84 | | D | 2 | 84 |
| H | 67 | 90 | | F | 99 | 17 |
| P | 54 | 75 | | C | 18 | 9 |
| C | 18 | 9 | | H | 67 | 90 |
| Z | 51 | 49 | +----+----+----+
| A | 34 | 25 |
+----+----+----+
Первая строка выбрана, потому что A является наименьшим значением в Col1. Второе, потому что Z - самое большое значение на Col1. Третье, потому что 2 наименьшее на Col2, и так далее. Код ниже, кажется, делает правильные вещи (поправьте меня, если я ошибаюсь), но производительность низка. Я начинаю с получения фрейма данных из случайного файла .csv:
input_file = (sqlContext.read
.format("csv")
.options(header="true", inferSchema="true", delimiter=";", charset="UTF-8")
.load("/FileStore/tables/random.csv")
)
Затем я создаю два других фрейма данных, каждый из которых имеет одну строку с минимальными и, соответственно, максимальными значениями каждого столбца:
from pyspark.sql.functions import col, min, max
min_values = input_file.select(
*[min(col(col_name)).name(col_name) for col_name in input_file.columns]
)
max_values = input_file.select(
*[max(col(col_name)).name(col_name) for col_name in input_file.columns]
)
Наконец, я неоднократно присоединяю исходный входной файл к этим двум фреймам данных, содержащим минимальное и максимальное значения, по очереди используя каждый столбец, и объединяю все результаты.
min_max_rows = (
input_file
.join(min_values, input_file[input_file.columns[0]] == min_values[input_file.columns[0]])
.select(input_file["*"]).limit(1)
.union(
input_file
.join(max_values, input_file[input_file.columns[0]] == max_values[input_file.columns[0]])
.select(input_file["*"]).limit(1)
)
)
for c in input_file.columns[1:]:
min_max_rows = min_max_rows.union(
input_file
.join(min_values, input_file[c] == min_values[c])
.select(input_file["*"]).limit(1)
.union(
input_file
.join(max_values, input_file[c] == max_values[c])
.select(input_file["*"]).limit(1)
)
)
min_max_rows.dropDuplicates()
Для моего тестаНабор данных из 500 тыс. строк, 40 столбцов, выполнение всего этого занимает около 7-8 минут в стандартном кластере Databricks. Я должен регулярно просеивать более 20 раз этот объем данных. Есть ли способ оптимизировать этот код? Я очень боюсь, что принял наивный подход к этому, так как я довольно новичок в Spark.
Спасибо!