Не могу применить скаляр pandas udf в pyspark - PullRequest
0 голосов
/ 02 апреля 2020

Я пытаюсь применить функцию python к кадру данных, используя pandas_udf. Это функция:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

Я применяю функцию extract_text в каждой серии pandas в udf следующим образом:

extract_text_udf = f.pandas_udf(lambda s : s.apply(udfs.extract_text), t.StringType())
df = df.withColumn("texto", extract_text_udf(f.col("html_raw")))

И затем я получаю следующая ошибка:

Traceback (most recent call last):
  File "process_info.py", line 70, in <module>
    row_count = info_df.count()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 523, in count
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o123.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3 in stage 2.0 (TID 2327, ip-10-2-6-163.eu-west-1.compute.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 254, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

Как применить функцию extract_text к моему фрейму данных, используя pandas_udf?

1 Ответ

0 голосов
/ 06 апреля 2020

Я нашел проблему. Проблема была в версии Apache Arrow. До версии 2.4.5 стрелка версии> 0.14.0 не работает.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...