Apache Конвейер луча с DataFlowRunner встречается с _dill.py: «ModuleNotFoundError: нет модуля с именем« main »» при развертывании из облачной функции - PullRequest
0 голосов
/ 10 марта 2020

Я пытаюсь выполнить конвейер потока данных из облачных функций на GCP, используя Python sdk. Протестировал код на сервере ноутбука, где конвейер работает с DataFlowRunner. Однако при использовании облачных функций для вызова конвейера я получаю следующее:

Ошибка


Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py", 
 line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py", 
 line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py", 
 line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", 
 line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
 line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", 
 line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", 
 line 176, in execute op.start() File "apache_beam/runners/worker/operations.py", 
 line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
 line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
 line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", 
 line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
 line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'

Так что мне кажется, что эта проблема возникает только тогда, когда поток данных вызывается без сервера. Я попытался добавить установочный файл , как предложено здесь , чтобы конвейер установил зависимости в правильных версиях, но это не помогло. Мне кажется, что этот вопрос похож на этот , однако я сомневаюсь, что единственный (непринятый) ответ будет работать, поскольку код облачной функции всегда запускается из main.py.

Код конвейера

class getResponse(beam.DoFn):
    def process(self, element, urlfield, pfield):
        response = requests.get(element[urlfield])
        status_code = response.status_code
        if status_code >= 200 and status_code < 300:
            yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}

class getImageData(beam.DoFn):
    def process(self, element, responsefield, urlfield, pfield):
        p = element[responsefield]
        img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
        yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}

class outputDummies(beam.DoFn):
    def process(self, element, dummy_data, image_datafield, urlfield, pfield):
        if element[image_datafield] == dummy_data:
            yield {'id': element[pfield], 'url': element[urlfield]}

def dataflow(in_test_mode=True,
             query=None,
             table_schema=None,
             table_spec=None,
             dummy_data=None,
             job_name=None,
             PROJECT=None,
             REGION=None,
             BUCKET=None):
    if in_test_mode:
        RUNNER = "DirectRunner"
        OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
    else:
        RUNNER = "DataflowRunner"
        OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)

    options = {
        "job_name": job_name,
        "project": PROJECT,
        "region": REGION,
        "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
        "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
        "streaming": False
    }
    opts = beam.pipeline.PipelineOptions(**options)
    # Run Beam
    with beam.Pipeline(RUNNER,
                       options=opts,
                       argv=['--setup_file', '/tmp/setup.py']) as p:
        (p |
         "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
                                                               use_standard_sql=True)) |
         "Get responses" >> beam.ParDo(getResponse(),
                                       urlfield='url',
                                       pfield='id') |

         "Process images" >> beam.ParDo(getImageData(),
                                        responsefield='response',
                                        urlfield='url',
                                        pfield='id') |

         "Output dummy images" >> beam.ParDo(outputDummies(),
                                             dummy_data=dummy_data,
                                             image_datafield='image_data',
                                             urlfield='url',
                                             pfield='id') |

         "Write to BQ" >> beam.io.WriteToBigQuery(
             table_spec,
             schema=table_schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
         )

needs.txt

apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0

Кто-нибудь есть обходной путь?

1 Ответ

0 голосов
/ 03 апреля 2020

Спасибо за все ваши предложения, в конце концов я не нашел решения для ошибки, но я пришел к решению для рабочего процесса. Как отметил AMargheriti, всегда есть шаблоны потока данных. Создав собственный шаблон кода, я смог запустить поток с помощью облачной функции. Полезная документация: шаблон создания потока данных , страница запущенных шаблонов и, наконец, это решение, поскольку предлагаемый API на странице запущенных шаблонов не позволяет задавать регион, в котором будет запускаться шаблон, тогда как dataflow (). Projects (). Location (). Templates (). Launch () разрешает добавление этой опции.

...