Проблема
Каждый раз, когда система получает сообщение от pubsub с раздвижными окнами, оно дублируется
Код
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
Вывод
Если я отправлю только одно сообщение из pub / sub и попытаюсь напечатать то, что у меня есть, после того, как скользящее окно закончится скод:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
Результат
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
Если я распечатаю сообщение до 'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
, я получу только один раз
Процесс в "графическом режиме":
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
Сообщение X было отправлено только один раз в начале скользящего окна, оно должно быть получено только один раз, но принимается дважды
Я пробовал оба значения AccumulationMode, а также триггер = AftyerWatermark, но я не могу решить проблему.
Что может быть не так?
Extra
С FixedWindows это правильный код для моего porpouse:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
или
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())