Падающее окно в Storm с количеством и продолжительностью - PullRequest
0 голосов
/ 27 июня 2018

Как создать Tumbling Window в Storm с обоими пороговыми значениями. Например, если я установил WindowCount равным 500, а WindowDuration равным 5 секундам, окно должно быть обработано, даже если имеется менее 500 сообщений, но прошло 5 секунд. Я мог видеть независимые API для обеих функций

Для графа

.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

Для времени

.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

Могу ли я иметь комбинацию обоих?

Если я настрою с помощью MessageCount, а не Duration, что произойдет с моими сообщениями, когда я остановлю топологию? Будет ли Storm обрабатывать эти сообщения, даже если счетчик пакетов не получен? Или я потеряю эти сообщения?

1 Ответ

0 голосов
/ 26 октября 2018

Я не верю, что вы можете сделать это с текущим оконным API.

Код достаточно подключаемый, чтобы сделать его внутренним, но необходимый API-интерфейс не раскрывается. Есть два интерфейса для определения того, как обрабатываются окна.

TriggerPolicy решает, когда доставлять окна к болту (например, «доставить, когда 100 кортежей буферизовано»).

EvictionPolicy решает, когда изгнать кортежи из текущего окна (например, «отбросить кортежи, если они находятся на расстоянии более 500 кортежей за последним кортежем в окне»).

Вы настраиваете эти политики косвенно, например, через. BaseWindowedBolt.withWindowLength , которая внутренне просто устанавливает некоторые свойства конфигурации. Эти свойства используются для определения политики триггера / исключения в WindowedBoltExecutor .

Я думаю, что нужно либо разрешить пользователям предоставлять свои собственные пользовательские TriggerPolicy / EvictionPolicy, либо поочередно добавить новый Trigger / EvictionPolicy, который делает то, что вы хотите.

Если вы хотите поднять проблему для этого, вы можете сделать это на https://issues.apache.org/jira/projects/STORM. Если вы хотите внести код, источник доступен на https://github.com/apache/storm,, где вы также можете поднять PR ,

...