Эффективная обработка столбцов в PySpark - PullRequest
0 голосов
/ 18 мая 2018

У меня есть фрейм данных с очень большим количеством столбцов (> 30000).

Я заполняю его 1 и 0 на основе первого столбца, например:

for column in list_of_column_names:
  df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))

Однако этот процесс занимает много времени.Есть ли способ сделать это более эффективно?Что-то подсказывает мне, что обработка столбцов может быть распараллелена.

Редактировать:

Пример входных данных

+----------------+-----+-----+-----+
|  list_column   | Foo | Bar | Baz |
+----------------+-----+-----+-----+
| ['Foo', 'Bak'] |     |     |     |
| ['Bar', Baz']  |     |     |     |
| ['Foo']        |     |     |     |
+----------------+-----+-----+-----+

Ответы [ 3 ]

0 голосов
/ 18 мая 2018

Вы можете подойти так,

import pyspark.sql.functions as F

exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)\
                  for column in list_column_names]

df = df.select(['list_column']+exprs)
0 голосов
/ 18 мая 2018

withColumn уже распределен, поэтому будет сложно получить более быстрый подход, кроме того, который у вас уже есть .вы можете попробовать определить функцию udf следующим образом

from pyspark.sql import functions as f
from pyspark.sql import types as t

def containsUdf(listColumn):
    row = {}
    for column in list_of_column_names:
        if(column in listColumn):
            row.update({column: 1})
        else:
            row.update({column: 0})
    return row

callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))

df.withColumn('struct', callContainsUdf(df['list_column']))\
    .select(f.col('list_column'), f.col('struct.*'))\
    .show(truncate=False)

, которая должна дать вам

+-----------+---+---+---+
|list_column|Foo|Bar|Baz|
+-----------+---+---+---+
|[Foo, Bak] |1  |0  |0  |
|[Bar, Baz] |0  |1  |1  |
|[Foo]      |1  |0  |0  |
+-----------+---+---+---+

Примечание: list_of_column_names = ["Foo","Bar","Baz"]

0 голосов
/ 18 мая 2018

В вашем коде нет ничего плохого, кроме очень широких данных:

for column in list_of_column_names:
    df = df.withColumn(...)

только генерирует план выполнения.

Фактическая обработка данных будет параллельной и распараллеленной после оценки результата.

Это, однако, дорогостоящий процесс, так как требует O (NMK) операций с N строками, M столбцов и K значений в списке.

Кроме того, планы выполнения для очень широких данных очень дороги для вычисления (хотя стоимость постоянна с точки зрения количества записей).Если это становится ограничивающим фактором, вам может быть лучше с RDDs:

  • Сортировать массив столбцов с помощью функции sort_array.
  • Преобразовать данные в RDD.
  • Применить поиск по каждому столбцу с помощью бинарного поиска.
...