Вариант 1: использовать UDF для одного столбца за раз
Самый простой подход - переписать вашу функцию, чтобы взять строку в качестве аргумента (чтобы она была string
-> string
) и использовать UDF.Вот хороший пример здесь .Это работает по одному столбцу за раз.Таким образом, если ваш DataFrame
имеет разумное количество столбцов, вы можете применить UDF к каждому столбцу по одному:
from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)
Пример
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
| 1| 4|
| 2| 5|
| 3| 6|
+----+----+
def plus1_udf(x):
return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)
new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
| 2| 5|
| 3| 6|
| 4| 7|
+-----------+-----------+
Вариант 2: Картавесь DataFrame за один раз
map
доступен для Scala DataFrame
s, но на данный момент не в PySpark.API нижнего уровня RDD имеет функцию map
в PySpark.Поэтому, если у вас слишком много столбцов для преобразования по одному, вы можете работать с каждой отдельной ячейкой в DataFrame
следующим образом:
def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)
Пример
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
| 2| 5|
| 3| 6|
| 4| 7|
+----+----+
Контекст
В документации из foreach
приведен только пример печати, но мы можем проверить по коду , что он действительно ничего не возвращает.
Вы можете прочитать о pandas_udf
в этом посте , но кажется, что он больше всего подходит для векторизованных функций, которые, как вы указали, вы не можете использовать из-за api_function
.