У меня большой набор данных в GCS в формате json, который мне нужно загрузить в BigQuery. Проблема в том, что данные json хранятся не в Nd Json, а в нескольких больших json файлах, где каждый ключ в JSON должен быть действительно полем в json.
Например - следующее Json:
{
"johnny": {
"type": "student"
},
"jeff": {
"type": "teacher"
}
}
должно быть преобразовано в
[
{
"name": "johnny",
"type": "student"
},
{
"name": "jeff",
"type": "teacher"
}
]
Я пытаюсь решить эту проблему с помощью потока данных Google и Apache Beam, но производительность ужасна, так как ech "Worker" должен проделать большую работу:
class JsonToNdJsonDoFn(beam.DoFn):
def __init__(self, pk_field_name):
self.__pk_field_name = pk_field_name
def process(self, line):
for key, record in json.loads(line).items():
record[self.__pk_field_name] = key
yield record
Я знаю, что это можно как-то решить, реализовав его как SplittableDoFn - но реализацию пример в Python там не совсем понятен. Как мне создать этот DoFn как разделяемый и как он будет использоваться как часть конвейера?