Я использую AWS EMR (5.29) для запуска задания pyspark, но получаю эту ошибку, когда я apply
a pandas udf.
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array
Вот фиктивный код для репликации проблемы.
import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
(1, "A", "X1"),
(2, "B", "X2"),
(3, "B", "X3"),
(1, "B", "X3"),
(2, "C", "X2"),
(3, "C", "X2"),
(1, "C", "X1"),
(1, "B", "X1"),
], ["id", "type", "code"])
и вот фиктивный udf
schema = StructType([
StructField("code", StringType()),
])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def dummy_udaf(pdf):
pdf = pdf[['code']]
return pdf
, и когда я запускаю эту строку,
df.groupby('type').apply(dummy_udaf).show()
я получаю это ошибка:
An error occurred while calling o149.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 12.0 failed 4 times, most recent failure: Lost task 64.3 in stage 12.0 (TID 66, ip-10-161-108-245.vpc.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 254, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array
Попытка использовать пониженную версию стрелки, как было предложено здесь , а также отключила оптимизацию пиарроу как предложено здесь , но ничего не помогло.