У меня есть простой конвейер потока данных и я пытаюсь выполнить его из облачной оболочки,
Код:
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Read' >> beam.io.ReadFromText('test.csv')
lines | 'Write' >> beam.io.WriteToText('gs://bucket/output_20193003', file_name_suffix='.csv')
result = p.run()
result.wait_until_finish()
Команда, используемая для выполнения:
python -m simple_pipeline --runner DataflowRunner --project myproject --staging_location gs://bucket/staging --temp_location gs://bucket/temp
Наблюдения: При выполнении из оболочки Cloud я сталкиваюсь с ошибкой ниже,
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 859, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 970, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1174, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1054, in push
response = self.worker.do_instruction(request)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 230, in process_bundle
processor.process_bundle(instruction_id)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 301, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 398, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 399, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 400, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 598, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 589, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 618, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 299, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 302, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 693, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 1005, in finish_bundle
yield WindowedValue(self.writer.close(), window.MAX_TIMESTAMP,
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filebasedsink.py", line 388, in close
self.sink.close(self.temp_handle)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filebasedsink.py", line 148, in close
file_handle.close()
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filesystemio.py", line 201, in close
self._uploader.finish()
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py", line 553, in finish
raise self._upload_thread.last_error # pylint: disable=raising-bad-type
RuntimeError: SSLHandshakeError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:661) [while running 'Write/Write/WriteImpl/WriteBundles']
Примечание: Я вижу эту ошибку только сОблачная оболочка и тот же код и команды отлично работают при выполнении с локального компьютера - Использование SDK или PyCharm IDE.
В чем проблема с моей облачной оболочкой?Как это исправить?Пожалуйста, предложите.