Загрузка многих jsons в BQ с использованием Apache Beam & Data Flow - json ошибки схемы - PullRequest
0 голосов
/ 22 января 2020

Мне нужно загрузить много файлов json в BQ, используя Apache Beam в Python. Jsons имеет довольно сложную схему (с несколькими уровнями иерархии), и что более важно - она ​​не согласована. Некоторые поля настолько редки, что появляются только в 0,01% процентов jsons. Я не могу позволить BQ выводить схему в методе WriteToBigQuery, используя AUTO_DETECT, потому что он проверяет только 100 строк - этого недостаточно. Я попытался построить схему для 0,1% процентов данных с помощью утилиты python generate-schema - но, опять же, некоторые поля настолько редки, что все равно не работают.

Нет такого поля: FIELD_NAME.

Я попытался найти способ загрузки файла независимо от каких-либо ошибок и сохранить ошибки в таблице ошибок, которую я могу обработать отдельно. Тем не менее, я не нашел, чтобы сделать это в модуле WriteToBigQuery. Я также пытался проверить каждый json перед отправкой в ​​конвейер, но это было очень медленно. Я также попытался «отфильтровать» json в соответствии с указанной схемой, но опять-таки это требует прохождения всех json - очень медленно, так как каждый размер json составляет около 13 КБ.

Кто-нибудь сталкивался с чем-нибудь, что может помочь? Странно, что нет никакого атрибута max_rejected для использования при записи в BQ с использованием Apache Beam. Любая идея о том, как справиться с этим, будет оценена.

Ответы [ 2 ]

0 голосов
/ 26 января 2020

В итоге я отформатировал JSON в соответствии с ошибками, которые я получал от BQ. Я заметил, что пропущенные поля всегда были полностью вложены в 2-3 поля в JSON, поэтому я просто преобразовал эти поля как JSON - и таким образом я успешно загрузил данные. Тем не менее, таблица журнала ошибок с максимальным отклоненным параметром в Apache Beam была бы чрезвычайно полезна.

0 голосов
/ 24 января 2020

Возможность расчета схемы «вручную». Если мы представляем схемы в виде набора кортежей set([field, type]) - например, set([('name', str), ('age', int)]).

class CombineSchemasByDestination(beam.DoFn):
  def __init__(self):
    self.schemas_per_dest = defaultdict(set)

  def process(self, dest_schema):
    destination, schemas = dest_schema
    for s in schemas:
      self.schemas_per_dest[destination].union(s)

  def finish_bundle(self):
    for dest, schema in self.schemas_per_dest.items():
      yield (dest, schema)

schemas_per_dest = (my_data 
                    | beam.Map(lambda row: (get_destination(row), 
                                            [get_row_schema(row)]))
                    | beam.ParDo(CombineSchemasByDestination())
                    | beam.GroupByKey()
                    | beam.CombineSchemasByDestination())

my_data | beam.WriteToBigQuery(....
  schema=lambda dest, schema_map: schema_map.get(dest),
  schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))

, я думаю, это поможет решить вашу проблему. Мысли

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...