Как эффективно получить одну строку для каждого минимального или максимального значения в каждом столбце кадра данных в Pyspark? - PullRequest
1 голос
/ 23 октября 2019

Я пытаюсь уменьшить большой набор данных до строк, имеющих минимальные и максимальные значения для каждого столбца. Другими словами, я хотел бы, чтобы для каждого столбца этого набора данных была получена одна строка с минимальным значением в этом столбце, а также другая строка с максимальным значением в том же столбце. Я должен отметить, что я не знаю заранее, какие столбцы будут иметь этот набор данных. Вот пример:

+----+----+----+     +----+----+----+
|Col1|Col2|Col3| ==> |Col1|Col2|Col3|
+----+----+----+     +----+----+----+
| F  | 99 | 17 |     | A  | 34 | 25 |
| M  | 32 | 20 |     | Z  | 51 | 49 |
| D  |  2 | 84 |     | D  |  2 | 84 |
| H  | 67 | 90 |     | F  | 99 | 17 |
| P  | 54 | 75 |     | C  | 18 |  9 |
| C  | 18 |  9 |     | H  | 67 | 90 |
| Z  | 51 | 49 |     +----+----+----+
| A  | 34 | 25 |
+----+----+----+

Первая строка выбрана, потому что A является наименьшим значением в Col1. Второе, потому что Z - самое большое значение на Col1. Третье, потому что 2 наименьшее на Col2, и так далее. Код ниже, кажется, делает правильные вещи (поправьте меня, если я ошибаюсь), но производительность низка. Я начинаю с получения фрейма данных из случайного файла .csv:

input_file = (sqlContext.read
    .format("csv")
    .options(header="true", inferSchema="true", delimiter=";", charset="UTF-8")
    .load("/FileStore/tables/random.csv")
)

Затем я создаю два других фрейма данных, каждый из которых имеет одну строку с минимальными и, соответственно, максимальными значениями каждого столбца:

from pyspark.sql.functions import col, min, max

min_values = input_file.select(
    *[min(col(col_name)).name(col_name) for col_name in input_file.columns]
)

max_values = input_file.select(
    *[max(col(col_name)).name(col_name) for col_name in input_file.columns]
)

Наконец, я неоднократно присоединяю исходный входной файл к этим двум фреймам данных, содержащим минимальное и максимальное значения, по очереди используя каждый столбец, и объединяю все результаты.

min_max_rows = (
  input_file
    .join(min_values, input_file[input_file.columns[0]] == min_values[input_file.columns[0]])
    .select(input_file["*"]).limit(1)
    .union(
      input_file
        .join(max_values, input_file[input_file.columns[0]] == max_values[input_file.columns[0]])
        .select(input_file["*"]).limit(1)
    )
)

for c in input_file.columns[1:]:
  min_max_rows = min_max_rows.union(
    input_file
      .join(min_values, input_file[c] == min_values[c])
      .select(input_file["*"]).limit(1)
      .union(
        input_file
          .join(max_values, input_file[c] == max_values[c])
          .select(input_file["*"]).limit(1)
      )
  )

min_max_rows.dropDuplicates()

Для моего тестаНабор данных из 500 тыс. строк, 40 столбцов, выполнение всего этого занимает около 7-8 минут в стандартном кластере Databricks. Я должен регулярно просеивать более 20 раз этот объем данных. Есть ли способ оптимизировать этот код? Я очень боюсь, что принял наивный подход к этому, так как я довольно новичок в Spark.

Спасибо!

1 Ответ

0 голосов
/ 24 октября 2019

Не кажется популярным вопросом, но интересным (для меня). И много работы за 15 баллов. На самом деле я ошибся в первый раз.

Вот масштабируемое решение, которое вы можете разделить соответствующим образом, чтобы увеличить пропускную способность.

Трудно объяснить, ключевым моментом здесь является манипулирование данными и транспонирование данных, а также некоторое боковое мышление.

Я не фокусировался на переменных столбцах всех видовтипов данных. Это должно быть решено самостоятельно, может быть сделано, но некоторые, если еще логика требуется, чтобы проверить, является ли альфа или двойной или числовой. Смешивание типов данных и применение к вещам становится проблематичным, но может быть решено. Я дал понятие num_string, но не завершил его.

Я сосредоточился на проблеме масштабируемости и подходе с меньшей процедурной логикой. Меньший размер выборки со всеми числами, но, насколько я вижу, исправьте. Общий принцип есть.

Попробуйте. Успех.

Код:

from pyspark.sql.functions import *
from pyspark.sql.types import *

def reshape(df, by):
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

df1 = spark.createDataFrame(
[(4, 15, 3), (200, 100, 25), (7, 16, 4)], ("c1", "c2", "c3"))
df1 = df1.withColumn("rowId", monotonically_increasing_id())                                   
df1.cache  
df1.show()

df2 = reshape(df1, ["rowId"])
df2.show()

# In case you have other types like characters in the other column - not focusing on that aspect
df3 = df2.withColumn("num_string", format_string("%09d", col("val")))

# Avoid column name issues.
df3 = df3.withColumn("key1", col("key"))
df3.show()

df3 = df3.groupby('key1').agg(min(col("val")).alias("min_val"), max(col("val")).alias("max_val"))
df3.show()

df4 = df3.join(df2, df3.key1 == df2.key)
new_column_condition = expr(
    """IF(val = min_val, -1, IF(val = max_val, 1, 0))"""
)
df4 = df4.withColumn("col_class", new_column_condition)
df4.show()

df5 = df4.filter(  '(min_val = val or max_val = val) and col_class <> 0'  ) 
df5.show()

df6 = df5.join(df1, df5.rowId == df1.rowId)
df6.show()

df6.select([c for c in df6.columns if c in ['c1','c2', 'c3']]).distinct().show()  

Returns:

+---+---+---+
| c1| c2| c3|
+---+---+---+
|  4| 15|  3|
|200|100| 25|
+---+---+---+

Данные приводят ключ к разгадке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...