Ошибка Pyarrow: при запуске pandas udf в pyspark - PullRequest
1 голос
/ 24 марта 2020

Я использую AWS EMR (5.29) для запуска задания pyspark, но получаю эту ошибку, когда я apply a pandas udf.

pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

Вот фиктивный код для репликации проблемы.

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = spark.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["id", "type", "code"])

и вот фиктивный udf

schema = StructType([
    StructField("code", StringType()),
])


@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def dummy_udaf(pdf):
    pdf = pdf[['code']]
    return pdf

, и когда я запускаю эту строку,

df.groupby('type').apply(dummy_udaf).show()

я получаю это ошибка:

An error occurred while calling o149.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 12.0 failed 4 times, most recent failure: Lost task 64.3 in stage 12.0 (TID 66, ip-10-161-108-245.vpc.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 254, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

Попытка использовать пониженную версию стрелки, как было предложено здесь , а также отключила оптимизацию пиарроу как предложено здесь , но ничего не помогло.

1 Ответ

2 голосов
/ 01 апреля 2020

У меня была та же проблема, что и у вас (на AWS EMR), и я смог ее решить, установив pyarrow==0.14.1. Я не знаю точно, почему это не сработало для вас, но одно предположение состоит в том, что вам нужно было выполнить эту установку в bootstrap сценарии , чтобы она произошла на всех машинах в вашем кластере. Недостаточно просто установить переменную среды в записной книжке, в которой вы работаете. Надеюсь, это вам поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...