Скользящие окна Python Apache Beam дублируют данные - PullRequest
0 голосов
/ 14 ноября 2018

Проблема

Каждый раз, когда система получает сообщение от pubsub с раздвижными окнами, оно дублируется


Код

 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

Вывод

Если я отправлю только одно сообщение из pub / sub и попытаюсь напечатать то, что у меня есть, после того, как скользящее окно закончится скод:

class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

Результат

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

Если я распечатаю сообщение до 'window' >> beam.WindowInto(window.SlidingWindows(30, 15)), я получу только один раз


Процесс в "графическом режиме":

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

Сообщение X было отправлено только один раз в начале скользящего окна, оно должно быть получено только один раз, но принимается дважды

Я пробовал оба значения AccumulationMode, а также триггер = AftyerWatermark, но я не могу решить проблему.

Что может быть не так?


Extra

С FixedWindows это правильный код для моего porpouse:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

или

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

1 Ответ

0 голосов
/ 15 ноября 2018

Все элементы, которые принадлежат окну, испускаются. Если элемент принадлежит нескольким окнам, он будет испущен в каждом окне.

Режим накопления имеет значение, только если вы планируете обрабатывать запаздывающие данные / многократные срабатывания триггера. В этом случае режим сброса дает вам только новые элементы в окне при повторном срабатывании триггера, то есть испускает только те элементы, которые поступили в то же окно с момента предыдущего срабатывания триггера, элементы, которые уже были отправлены, больше не испускаются и сбрасываются. В режиме накопления все окно будет генерироваться при каждом срабатывании триггера, оно будет включать старые элементы, которые были выпущены в прошлый раз, и новые элементы, которые поступили с тех пор.

Если я понимаю ваш пример, у вас есть раздвижные окна, они имеют длину 30 секунд и запускаются каждые 15 секунд. Таким образом, они перекрываются в течение 15 секунд:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=============|       :       :
  w2:               |==============|       :
  w3:                      |===============|
  ...

Таким образом, любой элемент в вашем случае будет принадлежать как минимум двум окнам (кроме первого и последнего окон).

например. в вашем примере, если ваше сообщение было отправлено между 17:07:15 и 17:07:30, оно появится в обоих окнах.

Исправлено, что окна не перекрываются, поэтому элемент может принадлежать только одному окну:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :             :               :
  w1:        |=============|               :
  w2:                      |===============|
  w3:                                      |====...
  ...

Подробнее об окнах здесь: https://beam.apache.org/documentation/programming-guide/#windowing

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...