Я пытаюсь получить данные из хранилища данных из другого пространства имен, используя поставщика значений времени выполнения в шаблоне потока данных.
Установить время выполнения ValueProvider
class CatalaugeOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--partner', default='', dest='partner', help='current date directory')
my_options = options.view_as(CatalaugeOption)
parser.add_argument('--kind', dest='kind', default='Product', help='Datastore Kind')
known_args, pipeline_args = parser.parse_known_args(argv)
Чтение данных из хранилища данных
with beam.Pipeline(argv=pipeline_args) as p:
req_options = p.options.get_all_options()
project = req_options['project']
namespace = my_options.partner
kind = known_args.kind
query = query_pb2.Query()
query.kind.add().name = kind
protobufs = p | 'Read From Datastore' >> ReadFromDatastore(
project, query, namespace=namespace)
p.run()
return
ValueError: Неверный DisplayDataItem. Значение RuntimeValueProvider (параметр: партнер, тип: str, default_value: '') имеет неподдерживаемый тип.