Запись в один файл для каждого окна в потоке данных с использованием Python - PullRequest
0 голосов
/ 21 мая 2019

После чтения данных из неограниченного источника, такого как pub / sub, я применяю управление окнами.Мне нужно записать все записи принадлежащие окну в отдельный файл.Я нашел this в Java, но ничего не смог найти в python.

1 Ответ

1 голос
/ 21 мая 2019

В этом вопросе нет подробной информации о вашем сценарии использования, поэтому вам может потребоваться адаптировать некоторые части следующего примера.Один из способов сделать это - сгруппировать элементы, используя в качестве ключа окно, к которому они принадлежат.Затем мы используем filesystems.FileSystems.create, чтобы управлять тем, как мы хотим записать файлы.

Здесь я буду использовать окна 10 с и некоторые фиктивные данные, где события разделены по 4 с каждый.Генерируется с помощью:

data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]

Мы используем поле timestamp для назначения метки времени элемента (это просто для того, чтобы эмулировать события Pub / Sub контролируемым образом).Мы создаем окна событий, используем информацию о окнах в качестве ключа, группируем по ключу и записываем результаты в папку output:

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
  | 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
  | 'Group By Window' >> beam.GroupByKey() \
  | 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))

Где AddWindowingInfoFn довольно просто:

class AddWindowingInfoFn(beam.DoFn):
  """output tuple of window(key) + element(value)"""
  def process(self, element, window=beam.DoFn.WindowParam):
    yield (window, element)

и WindowedWritesFn записывают в путь, который мы указали в конвейере (папка output/ в моем случае).Затем я использую информацию окна для имени файла.Для удобства я конвертирую метки времени эпохи в удобочитаемые даты.Наконец, мы перебираем все элементы и записываем их в соответствующий файл.Конечно, это поведение можно настроить по желанию в этой функции:

class WindowedWritesFn(beam.DoFn):
    """write one file per window/key"""
    def __init__(self, outdir):
        self.outdir = outdir

    def process(self, element):
        (window, elements) = element
        window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
        window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
        writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')

        for row in elements:
          writer.write(str(row)+ "\n")

        writer.close()

Это позволит записывать элементы, принадлежащие каждому окну, в другой файл.В моем случае есть 5 разных

$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt

Первый содержит только элемент 0 (это зависит от исполнения):

$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt 
{'timestamp': 1558465286.933727, 'event': '0'}

Второй содержит элементы с 1 по 3 ии так далее:

$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt 
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}

Предостережение при таком подходе состоит в том, что все элементы из одного окна сгруппированы в одного и того же работника.Это может произойти в любом случае, если запись в один осколок или выходной файл соответствует вашему случаю, но для более высоких нагрузок вам может потребоваться рассмотреть более крупные типы машин.

Полный код здесь

...