Возникла проблема при создании пользовательского шаблона для Cloud Dataflow. это простой код, который берет данные из корзины ввода и загружает их в BigQuery. Мы хотим загрузить много таблиц, поэтому пытаемся создать собственный шаблон. как только это сработает, следующим шагом будет передача набора данных также в качестве параметра.
Сообщение об ошибке:
AttributeError: у объекта 'StaticValueProvider' нет атрибута 'datasetId'
Код
class ContactUploadOptions(PipelineOptions):
"""
Runtime Parameters given during template execution
path and organization parameters are necessary for execution of pipeline
campaign is optional for committing to bigquery
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
type=str,
help='Path of the file to read from'
)
parser.add_value_provider_argument(
'--output',
type=str,
help='Output BQ table for the pipeline')
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
global PROJECT
from google.cloud import bigquery
# Retrieve project Id and append to PROJECT form GoogleCloudOptions
# Initialize runtime parameters as object
contact_options = PipelineOptions().view_as(ContactUploadOptions)
PROJECT = PipelineOptions().view_as(GoogleCloudOptions).project
client = bigquery.Client(project=PROJECT)
dataset = client.dataset('pharma')
data_ingestion = DataIngestion()
pipeline_options = PipelineOptions()
# Save main session state so pickled functions and classes
# defined in __main__ can be unpickled
pipeline_options.view_as(SetupOptions).save_main_session = True
# Parse arguments from command line.
#data_ingestion = DataIngestion()
# Instantiate pipeline
options = PipelineOptions()
p = beam.Pipeline(options=options)
(p
| 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
contact_options.output,
schema='assetid:INTEGER,assetname:STRING,prodcd:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
Моя команда следующая:
python3 -m pharm_template --runner DataflowRunner --project jupiter-120 --staging_location gs://input-cdc/temp/staging --temp_location gs://input-cdc/temp/ --template_location gs://code-cdc/temp/templates/jupiter_pipeline_template
Что я пробовал:
Я пытался передать --input
и --output
Я тоже пробовал --experiment=use_beam_bq_sink
но безрезультатно. Я также попытался передать datasetID
datasetId = StaticValueProvider(str, 'pharma')
, но не повезло. Если кто-то создал шаблон, который загружается в BQ, тогда я могу взять реплику и исправить эту проблему.