Синтаксис, который вы используете для pandas
DataFrame.Чтобы достичь этого для spark
DataFrame, вы должны использовать метод withColumn()
.Это прекрасно работает для широкого спектра четко определенных функций DataFrame , но это немного сложнее для пользовательских функций отображения.
Общий случай
Чтобы определить udf
, необходимо указать тип выходных данных.Например, если вы хотите применить функцию my_func
, которая возвращает string
, вы можете создать udf
следующим образом:
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
Затем вы можете использовать my_udf
для создания новогостолбец типа:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
Другой вариант заключается в использовании select
:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
Определенная проблема
Использование udf
В вашем конкретном случае вы хотите использовать словарь для перевода значений вашего DataFrame.
Вот способ определения udf
для этой цели:
some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())
Обратите внимание, что я использовал dict.get()
, потому что вы хотите, чтобы ваш udf
был устойчив к плохим входам.
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
Использование функций DataFrame
Иногда использование udf
неизбежно, но, по возможности, использование функций DataFrame обычно предпочтительнее.
Вот один из вариантов сделать то же самое без использования udf
.
Хитрость заключается в том, чтобы перебирать элементы в some_map
, чтобы создать список функций pyspark.sql.functions.when()
.
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]
Теперь вы можете использовать pyspark.sql.functions.coalesce()
внутри выбора:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))
Это работает, потому что when()
возвращает null
по умолчанию, еслиусловие не выполнено, и coalesce()
выберет первое ненулевое значение, с которым оно столкнется.Поскольку ключи карты уникальны, максимум один столбец будет ненулевым.