Я пытаюсь сделать запрос предка как этот пример и перенести его в версию шаблона.
Проблема в том, что параметр ancestor_id предназначен для функции make_query во время построения конвейера.
Если я не передам его при создании и создании шаблона, я получу RuntimeValueProviderError: RuntimeValueProvider (параметр: ancestor_id, тип: int) .get (), не вызванный из контекста времени выполнения . Но если я передаю его при создании шаблона, он выглядит как StaticValueProvider, который никогда не меняется при выполнении шаблона.
Как правильно передать параметр в шаблон для строительства конвейера?
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter
class Test(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--ancestor_id', type=int)
def make_query(ancestor_id):
ancestor = entity_pb2.Key()
datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
query = query_pb2.Query()
datastore_helper.set_kind(query, KIND)
datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
return query
pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))