С помощью Beam, как описано в документах , состояние сохраняется для каждого ключа и окна.Поэтому вы не можете получить доступ к значениям из предыдущих окон.
Чтобы сделать то, что вы хотите сделать, вам может потребоваться более сложный дизайн конвейера.Моя идея, разработанная здесь в качестве примера, состоит в том, чтобы дублировать ваши сообщения в ParDo:
- . Передавая их без изменений на основной выход
- Одновременно отправьте их набоковой вывод с задержкой в одно окно
Чтобы сделать вторую точку маркера, мы можем добавить продолжительность окна (WINDOW_SECONDS
) к отметке времени элемента:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
Мы вызываем функцию с указанием правильных тегов:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
и затем применяем одну и ту же схему управления окнами к обоим, группируем по ключу и т. Д.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Наконец, мы можемимеют оба значения (старое и новое) внутри одного и того же ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: {}".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
Чтобы проверить это, я запускаю конвейер с прямым бегуном и в отдельной оболочке публикую два сообщения с интервалом более 10 секунд (в моем случае WINDOW_SECONDS
было 10 с):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
И выходные данные задания показывают ожидаемую разницу:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Полный код для моего примера здесь .При дублировании элементов учитывайте соображения производительности, но имеет смысл, если вам нужно, чтобы значения были доступны в двух окнах.