Поток данных: поиск предыдущего события в потоке событий - PullRequest
1 голос
/ 21 марта 2019

Возобновление того, что я ищу для работы с Apache Beam в Google Dataflow, выглядит примерно так: LAG в Azure Stream Analytics

Использование окна X минут, в котором я получаюdata:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

Мне нужно сравнить данные (n) с данными (n-1), например, следуя предыдущему примеру, это будет примерно так:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

Есть ли "практический" способ сделать это?

1 Ответ

1 голос
/ 22 марта 2019

С помощью Beam, как описано в документах , состояние сохраняется для каждого ключа и окна.Поэтому вы не можете получить доступ к значениям из предыдущих окон.

Чтобы сделать то, что вы хотите сделать, вам может потребоваться более сложный дизайн конвейера.Моя идея, разработанная здесь в качестве примера, состоит в том, чтобы дублировать ваши сообщения в ParDo:

  • . Передавая их без изменений на основной выход
  • Одновременно отправьте их набоковой вывод с задержкой в ​​одно окно

Чтобы сделать вторую точку маркера, мы можем добавить продолжительность окна (WINDOW_SECONDS) к отметке времени элемента:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

Мы вызываем функцию с указанием правильных тегов:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

и затем применяем одну и ту же схему управления окнами к обоим, группируем по ключу и т. Д.

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

Наконец, мы можемимеют оба значения (старое и новое) внутри одного и того же ParDo:

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

Чтобы проверить это, я запускаю конвейер с прямым бегуном и в отдельной оболочке публикую два сообщения с интервалом более 10 секунд (в моем случае WINDOW_SECONDS было 10 с):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

И выходные данные задания показывают ожидаемую разницу:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

Полный код для моего примера здесь .При дублировании элементов учитывайте соображения производительности, но имеет смысл, если вам нужно, чтобы значения были доступны в двух окнах.

...