Я использую удаленный искровой кластер zeppelin connect.
Удаленная искра использует систему Python 2.7.
Я хочу перейти на miniconda3, установить lib pyarrow.
Что я делаю:
- Загрузите miniconda3, установите несколько библиотек, папку scp miniconda3 для master и slave для spark.
- добавление
PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
к spark-env.sh
в ведущем и подчиненных устройствах.
- перезапустить искру и дирижабль
Рабочий код
% spark.pyspark
import pandas as pd
from pyspark.sql.functions import pandas_udf,PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def process_order_items(pdf):
pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
d = {'has_discount':'count',
'clearance':'count',
'count': ['count', 'sum'],
'price_guide':'max',
'total_price': 'sum'
}
pdf1 = pdf.groupby('day').agg(d)
pdf1.columns = pdf1.columns.map('_'.join)
d1 = {'has_discount_count':'discount_order_count',
'clearance_count':'clearance_order_count',
'count_count':'order_count',
'count_sum':'sale_count',
'price_guide_max':'price_guide',
'total_price_sum': 'total_price'
}
pdf2 = pdf1.rename(columns=d1)
pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0, 'count'].resample(freq).sum()
pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0, 'count'].resample(freq).sum()
pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
return pdf2
results = df.groupby("store_id", "product_id").apply(process_order_items)
results.select(['store_id', 'price']).show(5)
Ошибка:
Py4JJavaError: An error occurred while calling o172.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 143, 10.104.33.18, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 150, in <lambda>
func = lambda _, it: map(mapper, it)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 276, in load_stream
import pyarrow as pa
ImportError: No module named pyarrow
10.104.33.18
является искровым мастером, поэтому я думаю, что PYSPARK_PYTHON
настроен неправильно.
Я вхожу в систему для ведущего и подчиненного, запускаю pyspark interpreter
в каждом и обнаружил, import pyarrow
не выдает исключение.
PS: pyarrow
также установлен в машине, на которой работает zeppelin.
Подробнее:
- искровой кластер установлен в A, B, C, zeppelin установлен в D.
PYSPARK_PYTHON
устанавливается в spark-env.sh
в каждом A, B, C
import pyarrow
в порядке с /usr/local/spark/bin/pyspark
в A, B, C /
import pyarrow
подходит для пользовательских питонов A, B, C (miniconda3)
import pyarrow
хорошо для Python по умолчанию D (miniconda3, путь отличается от A, B, C, но это не имеет значения)
Так что я совершенно не понимаю, почему это не работает.