Я пытаюсь выполнить конвейер потока данных из облачных функций на GCP, используя Python sdk. Протестировал код на сервере ноутбука, где конвейер работает с DataFlowRunner. Однако при использовании облачных функций для вызова конвейера я получаю следующее:
Ошибка
Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py",
line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py",
line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",
line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",
line 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py",
line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'
Так что мне кажется, что эта проблема возникает только тогда, когда поток данных вызывается без сервера. Я попытался добавить установочный файл , как предложено здесь , чтобы конвейер установил зависимости в правильных версиях, но это не помогло. Мне кажется, что этот вопрос похож на этот , однако я сомневаюсь, что единственный (непринятый) ответ будет работать, поскольку код облачной функции всегда запускается из main.py.
Код конвейера
class getResponse(beam.DoFn):
def process(self, element, urlfield, pfield):
response = requests.get(element[urlfield])
status_code = response.status_code
if status_code >= 200 and status_code < 300:
yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}
class getImageData(beam.DoFn):
def process(self, element, responsefield, urlfield, pfield):
p = element[responsefield]
img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}
class outputDummies(beam.DoFn):
def process(self, element, dummy_data, image_datafield, urlfield, pfield):
if element[image_datafield] == dummy_data:
yield {'id': element[pfield], 'url': element[urlfield]}
def dataflow(in_test_mode=True,
query=None,
table_schema=None,
table_spec=None,
dummy_data=None,
job_name=None,
PROJECT=None,
REGION=None,
BUCKET=None):
if in_test_mode:
RUNNER = "DirectRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
else:
RUNNER = "DataflowRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
options = {
"job_name": job_name,
"project": PROJECT,
"region": REGION,
"staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
"temp_location": os.path.join(OUTPUT_DIR, "tmp"),
"streaming": False
}
opts = beam.pipeline.PipelineOptions(**options)
# Run Beam
with beam.Pipeline(RUNNER,
options=opts,
argv=['--setup_file', '/tmp/setup.py']) as p:
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
use_standard_sql=True)) |
"Get responses" >> beam.ParDo(getResponse(),
urlfield='url',
pfield='id') |
"Process images" >> beam.ParDo(getImageData(),
responsefield='response',
urlfield='url',
pfield='id') |
"Output dummy images" >> beam.ParDo(outputDummies(),
dummy_data=dummy_data,
image_datafield='image_data',
urlfield='url',
pfield='id') |
"Write to BQ" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
needs.txt
apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0
Кто-нибудь есть обходной путь?