«Объект DataFrame не имеет атрибута apply при попытке применить лямбду для создания нового столбца - PullRequest
0 голосов
/ 04 июня 2018

Я стремлюсь добавить новый столбец в Pandas DataFrame, но я сталкиваюсь со странной ошибкой.

Ожидается, что новый столбец будет преобразованием из существующего столбца, что можно сделать, выполнив поискв словаре / хэш-карте.

# Loading data
df = sqlContext.read.format(...).load(train_df_path)

# Instanciating the map
some_map = {
    'a': 0, 
    'b': 1,
    'c': 1,
}

# Creating a new column using the map
df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

Что приводит к следующей ошибке:

AttributeErrorTraceback (most recent call last)
<ipython-input-12-aeee412b10bf> in <module>()
     25 df= train_df
     26 
---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

/usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
    962         if name not in self.columns:
    963             raise AttributeError(
--> 964                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
    965         jc = self._jdf.apply(name)
    966         return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'apply'

Другая потенциально полезная информация: * Я использую Spark и Python 2.

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Синтаксис, который вы используете для pandas DataFrame.Чтобы достичь этого для spark DataFrame, вы должны использовать метод withColumn().Это прекрасно работает для широкого спектра четко определенных функций DataFrame , но это немного сложнее для пользовательских функций отображения.

Общий случай

Чтобы определить udf, необходимо указать тип выходных данных.Например, если вы хотите применить функцию my_func, которая возвращает string, вы можете создать udf следующим образом:

import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())

Затем вы можете использовать my_udf для создания новогостолбец типа:

df = df.withColumn('new_column', my_udf(f.col("some_column_name")))

Другой вариант заключается в использовании select:

df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))

Определенная проблема

Использование udf

В вашем конкретном случае вы хотите использовать словарь для перевода значений вашего DataFrame.

Вот способ определения udf для этой цели:

some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())

Обратите внимание, что я использовал dict.get(), потому что вы хотите, чтобы ваш udf был устойчив к плохим входам.

df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))

Использование функций DataFrame

Иногда использование udf неизбежно, но, по возможности, использование функций DataFrame обычно предпочтительнее.

Вот один из вариантов сделать то же самое без использования udf.

Хитрость заключается в том, чтобы перебирать элементы в some_map, чтобы создать список функций pyspark.sql.functions.when().

some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]

Теперь вы можете использовать pyspark.sql.functions.coalesce() внутри выбора:

df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))

Это работает, потому что when() возвращает null по умолчанию, еслиусловие не выполнено, и coalesce() выберет первое ненулевое значение, с которым оно столкнется.Поскольку ключи карты уникальны, максимум один столбец будет ненулевым.

0 голосов
/ 04 июня 2018

У вас есть фрейм данных искры, а не фрейм данных pandas.Чтобы добавить новый столбец в фрейм данных spark:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()
...