Я пытаюсь создать шаблон потока данных Google для чтения файла JSON и загрузить его в хранилище данных Google. Ниже мой код.
Я могу успешно загрузить данные, однако я хотел бы передать ключ хранилища данных / KIND в качестве входного параметра из моего шаблона и создавать объекты, используя их. Может кто-нибудь помочь мне, как передать его код?
Ниже приведен фрагмент кода, получающий ввод во время выполнения. У меня --datastore_key как один из них.
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
Ниже приведен фрагмент, в котором я назначаю datastore_key для создания сущности согласно инструкции здесь .
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
Я создаю конвейерную линию, как показано ниже,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
Я не получаю ключ хранилища данных , созданный, если я передаю его как параметр времени выполнения . Если я пишу такой жесткий код, он работает
key = self.client.key('customer' ,element['customerNumber'])
Я хочу что-то вроде этого
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
Может кто-нибудь помочь мне, как передать хранилище данных Key / Kind в качестве параметра времени выполнения?
Спасибо, GS