Я пытаюсь написать конвейер Apache Beam, используя Python (3.7). Я сталкиваюсь с проблемами при импорте numpy , в частности, пытаюсь использовать numpy в классе преобразования DoFn, который я написал.
При запуске в GCP DataFlow я получаю следующую ошибку "NameError: name'numpy' не определен "
Для начала все работает так, как можно ожидать при использовании DirectRunner. Проблема заключается только в использовании GCP от DataFlow.
Я считаю, что проблема связана с тем, как работает область действия в GCP DataFlow, а не с самим импортом. Например, я могу успешно заставить импорт работать, если я добавляю его в метод «process» внутри моего класса, но безуспешно, когда я добавляю импорт в начало файла.
Я пробовал оба, используяфайл требований и файл setup.py в качестве параметров команды для конвейера, но ничего не изменилось. Опять же, я не верю, что проблема заключается в том, что проблема заключается в том, что DataFlow имеет неожиданную область видимости классов / функций.
файл setup.py
from __future__ import absolute_import
from __future__ import print_function
import setuptools
REQUIRED_PACKAGES = [
'numpy',
'Cython',
'scipy',
'google-cloud-bigtable'
]
setuptools.setup(
name='my-pipeline',
version='0.0.1',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
)
В целом, яЯ сталкиваюсь со многими проблемами с «областью действия», которые, я надеюсь, кто-то может помочь, поскольку документация по Apache Beam на самом деле не очень хорошо описывает это.
from __future__ import absolute_import
from __future__ import division
import apache_beam as beam
import numpy
class Preprocess(beam.DoFn):
def process(self, element, *args, **kwargs):
# Demonstrating how I want to call numpy in the process function
if numpy.isnan(numpy.sum(element['signal'])):
return [MyOject(element['signal'])]
def run(argv=None):
parser = argparse.ArgumentParser()
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)
messages = (p | beam.io.ReadFromPubSub(subscription=args.input_subscription).with_output_types(bytes))
lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
json_messages = lines | "Jsonify" >> beam.Map(lambda x: json.loads(x))
preprocess_messages = json_messages | "Preprocess" >> beam.ParDo(Preprocess())
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Я ожидаю, что конвейер будет работать так же, как и он. работает при локальном запуске с DirectRunner, но вместо этого область видимости / импорт работает иначе и вызывает сбой моего конвейера.