Я пытаюсь очень простой конвейер в потоке данных, используя пользовательские worker_harness_container_image
(и experiment=beam_fn_api
):
main.py
:
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import logging
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
(
p
| "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query="SELECT 1", use_standard_sql=True))
| "ParDo" >> beam.ParDo(Dummy())
)
p.run().wait_until_finish()
class Dummy(beam.DoFn):
def process(self, element):
pass
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
Dockerfile
просто:
FROM apachebeam/python3.7_sdk
Запускается так:
python3.7 -m main \
--runner DataflowRunner \
--project project_id \
--temp_location gs://bucket/tmp/ \
--region europe-west1 \
--zone europe-north1-c \
--worker_harness_container_image eu.gcr.io/project_id/image:latest \
--experiment=beam_fn_api
Это происходит сбой при
Caused by: org.apache.beam.runners.dataflow.util.Structs$ParameterNotFoundException:
didn’t find required parameter serialized_source in {@type=BigQueryAvroSource,
bigquery_export_schema={value={“fields”:[{“mode”:“NULLABLE”,“name”:“f0_“,”type”:“INTEGER”}]}, @type=http://schema.org/Text},
filename={value=gs://bucket/000000000000.avro, @type=http://schema.org/Text}}
Обратите внимание, что чтение временного файла Avro с помощью задания BigQuery Использование AvroIO работает просто отлично, т.е.:
(
p
| "Read from Avro" >> beam.io.Read(beam.io.avroio.ReadFromAvro("gs://bucket/000000000000.avro"))
| "ParDo" >> beam.ParDo(Dummy())
)