Проблема при запуске Numpy зависимого python кода на Spark - PullRequest
1 голос
/ 05 апреля 2020

Я запускаю задания PySpark через spark-submit в локальном кластере Spark. Я тестировал Ubuntu 18.04.3 LTS и Windows 10 Pro, получая точно такую ​​же ошибку. Я также получаю ту же ошибку при отправке заданий PySpark в Google Datapro c (Google Cloud Platform).

Все остальные мои задания PySpark запускаются без проблем, но как только на снимке появляется Numpy, он не отображается. t work.

Сообщение об ошибке:

Traceback (most recent call last):
  File "/home/xxx/git/xxx/dist/main.py", line 65, in <module>
    job_module = importlib.import_module('jobs.%s' % args.job_name)
  File "/usr/lib/python3.7/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 668, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 638, in _load_backward_compatible
  File "jobs.zip/jobs/xxx/__init__.py", line 4, in <module>
  File "/tmp/spark-8a796112-efae-4f6f-a738-68ab88e5eab9/userFiles-d43d0bfd-55db-4247-905f-f002faecc817/xxx.py", line 1, in <module>
    import pandas as pd
  File "/tmp/spark-8a796112-efae-4f6f-a738-68ab88e5eab9/userFiles-d43d0bfd-55db-4247-905f-f002faecc817/libs.zip/pandas/__init__.py", line 17, in <module>
ImportError: Unable to import required dependencies:
numpy:

IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!

Importing the numpy c-extensions failed.
- Try uninstalling and reinstalling numpy.
- If you have already done that, then:
  1. Check that you expected to use Python3.7 from "/home/xxx/envs/xxx/bin/python",
     and that you have no directories in your PATH or PYTHONPATH that can
     interfere with the Python and numpy version "1.18.1" you're trying to use.
  2. If (1) looks fine, you can open a new issue at
     https://github.com/numpy/numpy/issues.  Please include details on:
     - how you installed Python
     - how you installed numpy
     - your operating system
     - whether or not you have multiple versions of Python installed
     - if you built from source, your compiler versions and ideally a build log

- If you're working with a numpy git repository, try `git clean -xdf`
  (removes all files not under version control) and rebuild numpy.

Note: this error has many possible causes, so please don't comment on
an existing issue about this - open a new one instead.

Original error was: No module named 'numpy.core._multiarray_umath'

Я выполнил действия по устранению неполадок, описанные в журналах ошибок, но безрезультатно, включая публикацию проблемы на Numpy GitHub.

Моя структура кода заимствована из этой статьи . В частности, мой файл main.py выглядит следующим образом:

# main.py

import os
import sys
import argparse
import importlib
import time

# Allows you to run stuff locally - If running locally, you'll have the jobs folder available, in production it's a zip
if os.path.exists('libs.zip'):
    sys.path.insert(0, 'libs.zip')
else:
    sys.path.insert(0, './libs')

if os.path.exists('jobs.zip'):
    sys.path.insert(0, 'jobs.zip')  # makes all modules inside it available for import
else:
    sys.path.insert(0, './jobs')

try:
    import pyspark
except:
    import findspark
    findspark.init()
    import pyspark

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Run a PySpark job')
    parser.add_argument('--job', type=str, required=True, dest='job_name', help='The name of the job module you want to run. (ex: poc will run job on jobs.poc package)')
    parser.add_argument('--job-args', nargs='*', help='Extra arguments to send to the PySpark job (example: --job-args template=manual-email1 foo=bar')

    args = parser.parse_args()
    print(f'Called with arguments: {args}')

    environment = {
        'PYSPARK_JOB_ARGS': ' '.join(args.job_args) if args.job_args else ''
    }

    job_args = dict()
    if args.job_args:
        job_args_tuples = [arg_str.split('=') for arg_str in args.job_args]
        print(f'job_args_tuples: {job_args_tuples}')
        job_args = {a[0]: a[1] for a in job_args_tuples}

    print(f'\nRunning job {args.job_name}...\nenvironment is {environment}\n')

    os.environ.update(environment)

    conf = pyspark.SparkConf() \
        .setAppName('xxx') \
        .setMaster('local[*]')

    sc = pyspark.SparkContext.getOrCreate(conf=conf)

    job_module = importlib.import_module('jobs.%s' % args.job_name)

    start = time.time()
    job_module.analyze(sc, **job_args)
    end = time.time()

    print(f'Execution of job {args.job_name} took {end - start} seconds')

Я запускаю задания, выполняя в командной строке следующее:

spark-submit --py-files jobs.zip,libs.zip,xxx.py,xxx.py,xxx.py main.py --job my_job

libs.zip содержит все пакеты зависимостей для мой проект. jobs.zip содержит отдельные задания PySpark. Другие файлы .py - это написанные мной дополнительные модули, которые импортируются заданиями PySpark.

Numpy / Python информация о версии:

Windows 10:

1.18.1 3.7.1 (default, Dec 10 2018, 22:54:23) [MSC v.1915 64 bit (AMD64)]

Ubuntu 18:

1.18.1 3.7.5 (default, Nov  7 2019, 10:50:52)
[GCC 8.3.0]
...