Best
В данный момент я экспериментирую с pyspark pandas_udf , но, к сожалению, у меня возникают некоторые проблемы, когда я возвращаю DataFrame с: NA, None или NaNs в нем.Если я использую FloatType, то результат будет okey, но как только я использую IntegerType, TimestampType и т.д. ... я получаю сообщение об ошибке, и оно больше не работает.
Вот некоторыепримеры того, что работает и что не работает:
What does work?
Пример 1)
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',FloatType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
Результат:
User Sport Age Age_lag
0 Alice Football 27 NaN
1 Bob Basketball 34 27.0
2 Alice Football 27 NaN
3 Bob Basketball 34 27.0
Пример 2)
Если мы изменим Тип из Age_lag в IntegerType () и заполните Na на -1, тогда у нас еще будет действительный результат (без NaNs)
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1).fillna(-1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
Результат:
User Sport Age Age_lag
0 Alice Football 27 -1
1 Bob Basketball 34 27
2 Alice Football 27 -1
3 Bob Basketball 34 27
Что не работает?
Пример 3)
Если пропустить .fillna (-1) , томы получаем следующую ошибку
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
Результат: pyarrow.lib.ArrowInvalid: значение с плавающей запятой усечено
Пример 4)
И последнее, но не менее важное: если мы просто отправим статический фрейм данных туда, где age_lag содержит None , то он тоже не будет работать.
from pyspark.sql.types import StructType,NullType, StructField,FloatType, LongType, DoubleType, StringType, IntegerType
# true means, accepts nulls
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'],
'Sport': ['Football', 'Basketball'],
'Age': [27, 34],
'Age_lag': [27, None]})
return dt
df.groupby('id').apply(my_custom_function).toPandas()
Вопросы :
- Как вы справляетесь с этим?
- Это плохой дизайн?
- (потому что я могу представить 1000 случаев, когда я хочу вернуть NaN и None)
- Нужно ли нам заполнять все пропущенные значения?и заменить их потом?или работать с поплавками вместо целых чисел?так далее?
- это будет решено в ближайшем будущем?(потому что pandas_udf s довольно новый)