Как импортировать NumPy в конвейер Apache Beam, работающий на GCP Dataflow? - PullRequest
0 голосов
/ 03 октября 2019

Я пытаюсь написать конвейер Apache Beam, используя Python (3.7). Я сталкиваюсь с проблемами при импорте numpy , в частности, пытаюсь использовать numpy в классе преобразования DoFn, который я написал.

При запуске в GCP DataFlow я получаю следующую ошибку "NameError: name'numpy' не определен "

Для начала все работает так, как можно ожидать при использовании DirectRunner. Проблема заключается только в использовании GCP от DataFlow.

Я считаю, что проблема связана с тем, как работает область действия в GCP DataFlow, а не с самим импортом. Например, я могу успешно заставить импорт работать, если я добавляю его в метод «process» внутри моего класса, но безуспешно, когда я добавляю импорт в начало файла.

Я пробовал оба, используяфайл требований и файл setup.py в качестве параметров команды для конвейера, но ничего не изменилось. Опять же, я не верю, что проблема заключается в том, что проблема заключается в том, что DataFlow имеет неожиданную область видимости классов / функций.

файл setup.py

from __future__ import absolute_import
from __future__ import print_function
import setuptools

REQUIRED_PACKAGES = [
    'numpy',
    'Cython',
    'scipy',
    'google-cloud-bigtable'
]

setuptools.setup(
    name='my-pipeline',
    version='0.0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
)

В целом, яЯ сталкиваюсь со многими проблемами с «областью действия», которые, я надеюсь, кто-то может помочь, поскольку документация по Apache Beam на самом деле не очень хорошо описывает это.

from __future__ import absolute_import
from __future__ import division

import apache_beam as beam
import numpy

class Preprocess(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # Demonstrating how I want to call numpy in the process function
        if numpy.isnan(numpy.sum(element['signal'])):
            return [MyOject(element['signal'])]

def run(argv=None):
    parser = argparse.ArgumentParser()
    args, pipeline_args = parser.parse_known_args(argv)
    options = PipelineOptions(pipeline_args)

    p = beam.Pipeline(options=options)
    messages = (p | beam.io.ReadFromPubSub(subscription=args.input_subscription).with_output_types(bytes))
    lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
    json_messages = lines | "Jsonify" >> beam.Map(lambda x: json.loads(x))

    preprocess_messages = json_messages | "Preprocess" >> beam.ParDo(Preprocess())
    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Я ожидаю, что конвейер будет работать так же, как и он. работает при локальном запуске с DirectRunner, но вместо этого область видимости / импорт работает иначе и вызывает сбой моего конвейера.

1 Ответ

0 голосов
/ 03 октября 2019

Когда вы запускаете программу Python Apache Beam DirectRunner со своего рабочего стола, она запускается на вашем рабочем столе. Вы уже установили библиотеку NUMPY локально. Однако вы не сообщили Dataflow о загрузке и установке numpy. Вот почему ваша программа работает как DirectRunner, но не работает как DataflowRunner.

Редактируйте / Создайте обычный файл Python needs.txt и включите все зависимости, такие как numpy. Я предпочитаю использовать virtualdev, импортировать необходимые пакеты, убедиться, что моя программа работает под DirectRunner, а затем запустить pip freeze, чтобы создать мой список пакетов для needs.txt. Теперь Dataflow будет знать, какие пакеты импортировать, чтобы ваша программа работала в кластере Dataflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...