Применить функцию ко всем ячейкам в Spark DataFrame - PullRequest
0 голосов
/ 02 февраля 2019

Я пытаюсь преобразовать некоторый код Pandas в Spark для масштабирования.myfunc - это оболочка для сложного API, который принимает строку и возвращает новую строку (что означает, что я не могу использовать векторизованные функции).

def myfunc(ds):
    for attribute, value in ds.items():
        value = api_function(attribute, value)
        ds[attribute] = value
    return ds

df = df.apply(myfunc, axis='columns')

myfunc принимает DataSeries, разбивает его наотдельные ячейки, вызывает API для каждой ячейки и создает новый DataSeries с теми же именами столбцов.Это эффективно изменяет все ячейки в DataFrame.

Я новичок в Spark и хочу перевести эту логику, используя pyspark.Я преобразовал свой DataFrame для панд в Spark:

spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)

Здесь я заблудился.Нужен ли UDF, pandas_udf?Как мне перебрать все ячейки и вернуть новую строку для каждой, используя myfunc?spark_df.foreach() ничего не возвращает и не имеет функции map().

Я могу изменить myfunc с DataSeries -> DataSeries на string -> stringпри необходимости.

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Решение:

udf_func = udf(func, StringType())
for col_name in spark_df.columns:
    spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()

Есть 3 ключевых момента, которые помогли мне понять это:

  1. Если вы используете withColumn с именем существующего столбца(col_name), Spark "перезаписывает" / затеняет исходный столбец.По сути, это создает видимость непосредственного редактирования столбца, как если бы он был изменяемым.
  2. Создав цикл для исходных столбцов и повторно используя одну и ту же переменную DataFrame spark_df, я использую тот же принцип для имитации изменяемого DataFrame.создавая цепочку преобразований по столбцам, каждый раз «перезаписывая» столбец (для # 1 - см. ниже)
  3. Spark UDFs ожидает, что все параметры будут Column типов, что означает, что он пытаетсяРазрешить значения столбцов для каждого параметра.Поскольку первый параметр api_function является литеральным значением, которое будет одинаковым для всех строк в векторе, вы должны использовать функцию lit().Простая передача col_name в функцию попытается извлечь значения столбца для этого столбца.Насколько я могу сказать, передача col_name эквивалентна передаче col(col_name).

Если предположить, что 3 столбца 'a', 'b' и 'c', развертывание этой концепции будет выглядеть следующим образом:

spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
                   .withColumn('b', udf_func(lit('b'), 'b')
                   .withColumn('c', udf_func(lit('c'), 'c')
0 голосов
/ 02 февраля 2019

Вариант 1: использовать UDF для одного столбца за раз

Самый простой подход - переписать вашу функцию, чтобы взять строку в качестве аргумента (чтобы она была string -> string) и использовать UDF.Вот хороший пример здесь .Это работает по одному столбцу за раз.Таким образом, если ваш DataFrame имеет разумное количество столбцов, вы можете применить UDF к каждому столбцу по одному:

from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)

Пример

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
|   1|   4|
|   2|   5|
|   3|   6|
+----+----+

def plus1_udf(x):
    return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)

new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
|          2|          5|
|          3|          6|
|          4|          7|
+-----------+-----------+

Вариант 2: Картавесь DataFrame за один раз

map доступен для Scala DataFrame s, но на данный момент не в PySpark.API нижнего уровня RDD имеет функцию map в PySpark.Поэтому, если у вас слишком много столбцов для преобразования по одному, вы можете работать с каждой отдельной ячейкой в ​​DataFrame следующим образом:

def map_fn(row):
    return [api_function(x) for (column, x) in row.asDict().items()

column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)

Пример

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
   return [value + 1 for (_, value) in row.asDict().items()]

columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
|   2|   5|
|   3|   6|
|   4|   7|
+----+----+

Контекст

В документации из foreach приведен только пример печати, но мы можем проверить по коду , что он действительно ничего не возвращает.

Вы можете прочитать о pandas_udf в этом посте , но кажется, что он больше всего подходит для векторизованных функций, которые, как вы указали, вы не можете использовать из-за api_function.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...