Считывание из BigQuerySource с использованием пользовательского worker_harness_container_image завершается с ошибкой «не найден обязательный параметр serialized_source» - PullRequest
0 голосов
/ 21 января 2020

Я пытаюсь очень простой конвейер в потоке данных, используя пользовательские worker_harness_container_imageexperiment=beam_fn_api):

main.py:

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import logging


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p = beam.Pipeline(options=pipeline_options)

    (
        p
        | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query="SELECT 1", use_standard_sql=True))
        | "ParDo" >> beam.ParDo(Dummy())
    )

    p.run().wait_until_finish()


class Dummy(beam.DoFn):
    def process(self, element):
        pass


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

Dockerfile просто:

FROM apachebeam/python3.7_sdk

Запускается так:

python3.7 -m main \
--runner DataflowRunner \
--project project_id \
--temp_location gs://bucket/tmp/ \
--region europe-west1 \
--zone europe-north1-c \
--worker_harness_container_image eu.gcr.io/project_id/image:latest \
--experiment=beam_fn_api

Это происходит сбой при

Caused by: org.apache.beam.runners.dataflow.util.Structs$ParameterNotFoundException: 
didn’t find required parameter serialized_source in {@type=BigQueryAvroSource, 
bigquery_export_schema={value={“fields”:[{“mode”:“NULLABLE”,“name”:“f0_“,”type”:“INTEGER”}]}, @type=http://schema.org/Text}, 
filename={value=gs://bucket/000000000000.avro, @type=http://schema.org/Text}}

Обратите внимание, что чтение временного файла Avro с помощью задания BigQuery Использование AvroIO работает просто отлично, т.е.:

    (
        p
        | "Read from Avro" >> beam.io.Read(beam.io.avroio.ReadFromAvro("gs://bucket/000000000000.avro"))
        | "ParDo" >> beam.ParDo(Dummy())
    )

1 Ответ

0 голосов
/ 03 февраля 2020

Согласно тому, что я читаю в этой теме , Docker контейнеры, используемые для рабочих потоков данных, в настоящее время являются частными и не могут быть изменены или настроены. Это именно то, что делает worker_harness_container_image, выбирая другой контейнер. Кроме того, я не могу воспроизвести вашу проблему, и я не могу найти документацию для этого метода, поэтому он кажется неподдерживаемым. Мой совет - запустить ваш конвейер без worker_harness_container_image и посмотреть, работает ли он должным образом.

...