Применить функцию для каждой группы в pyspark -pandas_udf (нет модуля с именем pyarrow) - PullRequest
0 голосов
/ 28 июня 2018

Я пытаюсь применить функцию к каждой группе набора данных в pyspark. Первая ошибка, которую я получил, была

Py4JError: An error occurred while calling o62.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

Чтобы решить вышесказанное, я убрал функцию искры (у меня было spark.range()). Теперь ошибка устранена, но теперь я получаю следующее:

File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/serializers.py", line 276, in load_stream
    import pyarrow as pa
ImportError: No module named pyarrow 

Но когда я пытаюсь сделать это самостоятельно, то есть.

df = pd.DataFrame({"a": [1, 2, 3]})
pa.Table.from_pandas(df)
pyarrow.Table
a: int64
__index_level_0__: int64
metadata
--------
{'pandas': '{"pandas_version": "0.23.0", "index_columns": ["__index_level_0__"], "columns": [{"metadata": null, "field_name": "a", "name": "a", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "__index_level_0__", "name": null, "numpy_type": "int64", "pandas_type": "int64"}], "column_indexes": [{"metadata": null, "field_name": null, "name": null, "numpy_type": "object", "pandas_type": "bytes"}]}'}

ПРИМЕР, КОТОРЫЙ НЕ БЫЛ - Взят от здесь

import pyspark.sql.functions as F
import pandas as pd

cols = ['id', 'val']
vals = [('A', 5), ('A', 3), ('A', 7), ('B', 12), ('B', 15), ('C', 3)]
d1 = sqlContext.createDataFrame(vals, cols)

>>> @F.pandas_udf(d1.schema, F.PandasUDFType.GROUPED_MAP)
... def subtract_mean(pdf):
...     return pdf.assign(v=pdf.v - pdf.v.mean())
...
>>> try1 = d1.groupby('id').apply(subtract_mean)
>>> try1.show()

Если я попытаюсь преобразовать данные в фрейм pandas, он не будет работать, поскольку у него нет атрибута schema.

Что мне здесь не хватает?

1 Ответ

0 голосов
/ 28 июня 2018

pyarrow должен присутствовать в пути на каждом рабочем узле.

  • Если вы запускаете этот код как один узел, убедитесь, что PYSPARK_PYTHON (и, возможно, его PYTHONPATH) совпадают с интерпретатором, который вы используете для тестирования pyarrow кода.
  • Если вы используете кластер, убедитесь, что на каждом узле установлена ​​pyarrow, в дополнение к указанным выше пунктам.

Кроме того, убедитесь, что установленная pyarrow версия больше или равна поддерживаемой минимальной ( 0.8 сегодня ) - хотя это должно вызвать другое исключение.

...