Google Data Flow Передача данных хранилище Ключ в качестве входного параметра - PullRequest
0 голосов
/ 12 января 2020

Я пытаюсь создать шаблон потока данных Google для чтения файла JSON и загрузить его в хранилище данных Google. Ниже мой код.

Я могу успешно загрузить данные, однако я хотел бы передать ключ хранилища данных / KIND в качестве входного параметра из моего шаблона и создавать объекты, используя их. Может кто-нибудь помочь мне, как передать его код?

Ниже приведен фрагмент кода, получающий ввод во время выполнения. У меня --datastore_key как один из них.

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--json_input',
                dest='json_input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

        parser.add_value_provider_argument(
                '--project_id',
                dest='project_id',
                type=str,
                required=False,
                help='Input Project ID.')

        parser.add_value_provider_argument(
                '--datastore_key',
                dest='datastore_key',
                type=str,
                required=False,
                help='The Key name')

Ниже приведен фрагмент, в котором я назначаю datastore_key для создания сущности согласно инструкции здесь .

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id):
       self.project_id = project_id

    def start_bundle(self):
        self.client = datastore.Client()

    def start_datastore(self, datastore_key):
        self.datastore_key = datastore_key

    def process(self, an_int):
        yield self.datastore_key.get() + an_int

    def process(self, element):
        try:
            key = self.client.key(datastore_key ,element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

Я создаю конвейерную линию, как показано ниже,

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))

Я не получаю ключ хранилища данных , созданный, если я передаю его как параметр времени выполнения . Если я пишу такой жесткий код, он работает

key = self.client.key('customer' ,element['customerNumber'])

Я хочу что-то вроде этого

key = self.client.key(runtime_datastore_key ,runtime_datastore_id)

Может кто-нибудь помочь мне, как передать хранилище данных Key / Kind в качестве параметра времени выполнения?

Спасибо, GS

1 Ответ

0 голосов
/ 13 января 2020

Похоже, вы не передаете в datastore_key значение провайдера CreateHbaseRow.


Попробуйте использовать:

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id, datastore_key):
       self.project_id = project_id
       self.datastore_key = datastore_key

    def start_bundle(self):
        self.client = datastore.Client()

    def process(self, element):
        try:
            key = self.client.key(datastore_key.get(), element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

Обратите внимание, что я оставил project_id, поскольку казалось, что вы этого хотели, но мой код ниже не использует его .


Вы также хотите, чтобы вы передавали соответствующих поставщиков значений из экземпляра options в ваш DoFn. Таким образом, ваш код создания конвейера становится:

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...