Как заставить pyspark использовать собственный питон? - PullRequest
0 голосов
/ 05 сентября 2018

Я использую удаленный искровой кластер zeppelin connect.

Удаленная искра использует систему Python 2.7.

Я хочу перейти на miniconda3, установить lib pyarrow. Что я делаю:

  1. Загрузите miniconda3, установите несколько библиотек, папку scp miniconda3 для master и slave для spark.
  2. добавление PYSPARK_PYTHON="/usr/local/miniconda3/bin/python" к spark-env.sh в ведущем и подчиненных устройствах.
  3. перезапустить искру и дирижабль
  4. Рабочий код

    % spark.pyspark

    import pandas as pd
    from pyspark.sql.functions import pandas_udf,PandasUDFType
    
    
    @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
    def process_order_items(pdf):
    
        pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
    
        d = {'has_discount':'count',
            'clearance':'count',
            'count': ['count', 'sum'],
            'price_guide':'max',
            'total_price': 'sum'
    
        }
    
        pdf1 = pdf.groupby('day').agg(d)
        pdf1.columns = pdf1.columns.map('_'.join)
        d1 = {'has_discount_count':'discount_order_count',
            'clearance_count':'clearance_order_count',
            'count_count':'order_count',
            'count_sum':'sale_count',
            'price_guide_max':'price_guide',
            'total_price_sum': 'total_price'
        }
    
        pdf2 = pdf1.rename(columns=d1)
    
        pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0, 'count'].resample(freq).sum()
        pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0, 'count'].resample(freq).sum()
        pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
    
        pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
    
        return pdf2
    
    
    results = df.groupby("store_id", "product_id").apply(process_order_items)
    
    results.select(['store_id', 'price']).show(5)
    

Ошибка:

Py4JJavaError: An error occurred while calling o172.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 143, 10.104.33.18, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 150, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 276, in load_stream
    import pyarrow as pa
ImportError: No module named pyarrow

10.104.33.18 является искровым мастером, поэтому я думаю, что PYSPARK_PYTHON настроен неправильно.

Я вхожу в систему для ведущего и подчиненного, запускаю pyspark interpreter в каждом и обнаружил, import pyarrow не выдает исключение.

PS: pyarrow также установлен в машине, на которой работает zeppelin.


Подробнее:

  1. искровой кластер установлен в A, B, C, zeppelin установлен в D.
  2. PYSPARK_PYTHON устанавливается в spark-env.sh в каждом A, B, C
  3. import pyarrow в порядке с /usr/local/spark/bin/pyspark в A, B, C /
  4. import pyarrow подходит для пользовательских питонов A, B, C (miniconda3)
  5. import pyarrow хорошо для Python по умолчанию D (miniconda3, путь отличается от A, B, C, но это не имеет значения)

Так что я совершенно не понимаю, почему это не работает.

1 Ответ

0 голосов
/ 05 сентября 2018

Перейдите в папку конфигурации zeppelin ( $ ZEPPELIN_HOME / conf ) и найдите файл interpreter.json

Найдите интерпретатор, который вы хотите исправить (искра) в этом случае.

Обновите следующее свойство, чтобы указать путь к вашей установке Python:

- "zeppelin.pyspark.python": "python"
+ "zeppelin.pyspark.python": "/usr/bin/anaconda/bin/python"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...