Как написать разделяемый DoFn в python - преобразовать json в nd json в apache луч - PullRequest
1 голос
/ 01 февраля 2020

У меня большой набор данных в GCS в формате json, который мне нужно загрузить в BigQuery. Проблема в том, что данные json хранятся не в Nd Json, а в нескольких больших json файлах, где каждый ключ в JSON должен быть действительно полем в json.

Например - следующее Json:

{
  "johnny": {
    "type": "student"
  }, 
  "jeff": {
    "type": "teacher"
  }
}

должно быть преобразовано в

[ 
  {
    "name": "johnny",
    "type": "student"
  }, 
  {
    "name": "jeff",
    "type": "teacher"
  }
]

Я пытаюсь решить эту проблему с помощью потока данных Google и Apache Beam, но производительность ужасна, так как ech "Worker" должен проделать большую работу:

class JsonToNdJsonDoFn(beam.DoFn):
    def __init__(self, pk_field_name):
        self.__pk_field_name = pk_field_name

    def process(self, line):
        for key, record in json.loads(line).items():
            record[self.__pk_field_name] = key
            yield record

Я знаю, что это можно как-то решить, реализовав его как SplittableDoFn - но реализацию пример в Python там не совсем понятен. Как мне создать этот DoFn как разделяемый и как он будет использоваться как часть конвейера?

1 Ответ

0 голосов
/ 01 апреля 2020

Вам нужен способ указать частичный диапазон для обработки файла json. Например, это может быть диапазон байтов.

Пример Avro в блоге - хороший пример. Что-то вроде:

class MyJsonReader(DoFn):
  def process(filename, tracker=DoFn.RestrictionTrackerParam)
    with fileio.ChannelFactory.open(filename) as file:
      start, stop = tracker.current_restriction()
      # Seek to the first block starting at or after the start offset.
      file.seek(start)
      next_record_start = find_next_record(file, start)
      while start:
        # Claim the position of the current record
        if not tracker.try_claim(next_record_start):
          # Out of range of the current restriction - we're done.
          return
        # start will point to the end of the record that was read
        record, start = read_record(file, next_record_start)
        yield record

  def get_initial_restriction(self, filename):
    return (0, fileio.ChannelFactory.size_in_bytes(filename))

Однако json не имеет четких границ записи, поэтому, если ваша работа должна начинаться с байта 548, нет четкого способа определить, сколько нужно сдвинуть. Если файл в буквальном смысле соответствует тому, что у вас есть, вы можете пропускать байты, пока не увидите шаблон "<string>": {. А затем прочитайте объект json, начиная с {.

...