Как передать параметр в шаблон потока данных для построения конвейера - PullRequest
0 голосов
/ 10 мая 2018

Я пытаюсь сделать запрос предка как этот пример и перенести его в версию шаблона.

Проблема в том, что параметр ancestor_id предназначен для функции make_query во время построения конвейера. Если я не передам его при создании и создании шаблона, я получу RuntimeValueProviderError: RuntimeValueProvider (параметр: ancestor_id, тип: int) .get (), не вызванный из контекста времени выполнения . Но если я передаю его при создании шаблона, он выглядит как StaticValueProvider, который никогда не меняется при выполнении шаблона.

Как правильно передать параметр в шаблон для строительства конвейера?

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter

class Test(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--ancestor_id', type=int)

def make_query(ancestor_id):
    ancestor = entity_pb2.Key()
    datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
    query = query_pb2.Query()
    datastore_helper.set_kind(query, KIND)
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
    return query

pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
  entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))

1 Ответ

0 голосов
/ 22 мая 2018

Две проблемы.

  1. Метод ValueProvider.value.get() может выполняться только во время выполнения, например ParDo.process(). См. пример .

  2. Кроме того, ваша проблема заключается в том, что вы используете Google Cloud Datastore IO (запрос из хранилища данных). На сегодня (май 2018 г.) официальная документация указывает, что IO хранилища данных НЕ принимает параметры шаблона времени выполнения.

Для Python, в частности,

Следующие соединители принимают параметры времени выполнения. Файловые операции ввода-вывода: textio, avroio, tfrecordio

A обходной путь : вы, вероятно, можете сначала выполнить запрос без каких-либо шаблонных параметров, чтобы получить коллекцию объектов PC. В настоящее время, поскольку любые преобразователи могут принимать шаблонный параметр, вы можете использовать его как фильтр . Но это зависит от вашего варианта использования и может быть неприменимо к вам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...