В этом вопросе нет подробной информации о вашем сценарии использования, поэтому вам может потребоваться адаптировать некоторые части следующего примера.Один из способов сделать это - сгруппировать элементы, используя в качестве ключа окно, к которому они принадлежат.Затем мы используем filesystems.FileSystems.create
, чтобы управлять тем, как мы хотим записать файлы.
Здесь я буду использовать окна 10 с и некоторые фиктивные данные, где события разделены по 4 с каждый.Генерируется с помощью:
data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
Мы используем поле timestamp
для назначения метки времени элемента (это просто для того, чтобы эмулировать события Pub / Sub контролируемым образом).Мы создаем окна событий, используем информацию о окнах в качестве ключа, группируем по ключу и записываем результаты в папку output
:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
| 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
| 'Group By Window' >> beam.GroupByKey() \
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
Где AddWindowingInfoFn
довольно просто:
class AddWindowingInfoFn(beam.DoFn):
"""output tuple of window(key) + element(value)"""
def process(self, element, window=beam.DoFn.WindowParam):
yield (window, element)
и WindowedWritesFn
записывают в путь, который мы указали в конвейере (папка output/
в моем случае).Затем я использую информацию окна для имени файла.Для удобства я конвертирую метки времени эпохи в удобочитаемые даты.Наконец, мы перебираем все элементы и записываем их в соответствующий файл.Конечно, это поведение можно настроить по желанию в этой функции:
class WindowedWritesFn(beam.DoFn):
"""write one file per window/key"""
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
(window, elements) = element
window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')
for row in elements:
writer.write(str(row)+ "\n")
writer.close()
Это позволит записывать элементы, принадлежащие каждому окну, в другой файл.В моем случае есть 5 разных
$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
Первый содержит только элемент 0 (это зависит от исполнения):
$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt
{'timestamp': 1558465286.933727, 'event': '0'}
Второй содержит элементы с 1 по 3 ии так далее:
$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
Предостережение при таком подходе состоит в том, что все элементы из одного окна сгруппированы в одного и того же работника.Это может произойти в любом случае, если запись в один осколок или выходной файл соответствует вашему случаю, но для более высоких нагрузок вам может потребоваться рассмотреть более крупные типы машин.
Полный код здесь