Использование MatchFiles () в конвейере лучей apache для получения имени файла и анализа json в python - PullRequest
0 голосов
/ 24 сентября 2019

У меня много json-файлов в корзине, и с помощью python 3 я хочу получить имя файла, а затем создать пару ключей и значений файлов и прочитать их.Я считаю, что файлы соответствия теперь работают для python, но мне было интересно, как бы я это реализовал:

files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json") 
    | #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json

Цель, скажем, у меня было 10 файлов в корзине:

gs://mybucket/myfile1.json
gs://mybucket/myfile2.json

Ивсе json-файлы в корзине имеют одинаковую структуру

Я передаю ее в пользовательский класс ParseFile (я думаю, что через ParDo мои знания Apache Beam ограничены), и для каждой строки в json я выводлю словарь(который я сохраню в json с разделителями новой строки), где одним из ключей является имя файла.

Редактировать 9/24 11:15 утра pst: вот что я попробовал

file_content_pairs = (p 
                | fileio.MatchFiles(known_args.input_bucket)
                | fileio.ReadMatches()
                | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
                | beam.ParDo(TestThis())
                )

TestThis () просто должен напечатать содержимое:

class TestThis(beam.DoFn):

    def process(self, element):
        print(element)
        print("stop")
        yield element

Но все, что я вижу в моих выходных данных: INFO: root: Закончено перечисление 2 файлов за 1,2762866020202637 секунд.

1 Ответ

0 голосов
/ 24 сентября 2019

я не поняла.Вы хотите, чтобы пары ключ-значение были (filename, json-parsed-contents)?

Если это так, вы бы:

file_content_pairs = (
  p | fileio.MatchFiles("gs://mybucketname/*.json")
    | fileio.ReadMatches()
    | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)

Итак, если ваш файл выглядит так:

==============myfile.json===============
{"a": "b",
 "c": "d",
 "e": 1}

Тогда ваша коллекция file_content_pairs будет содержать пару ключ-значение ("myfile.json", {"a":"b", "c": "d", "e": 1}).


Если ваш файл в формате json lines, вы должны сделать:

def consume_file(f):
  other_name = query_bigquery(f.metadata.path)
  return [(other_name, json.loads(line))
          for line in f.read_utf8().strip().split('\n')]

with Pipeline() as p:
  result = (p
            | fileio.MatchFiles("gs://mybucketname/*.json")
            | fileio.ReadMatches()
            | beam.FlatMap(consume_file))
...