Звучит так, как будто вы хотите использовать оконные болты https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html. Возможно, вы хотите, чтобы окно было опрокидывающимся (то есть без перекрытия между оконными интервалами)
Оконные болты позволяют вам установить интервал, при котором они должны испускать окна (например,каждые 10 секунд), а затем болт буферизует все кортежи, которые он получает за предыдущие 10 секунд, прежде чем вызывать метод execute, который вы предоставляете.
Структура, которую я думаю, вам нужна, например,
spout -> splitter -> 5 second window bolt
-> 10 second window bolt
Сплиттер должен получить кортежи, исследовать поле частоты и отправить кортеж на правый оконный болт.Вы заставляете это делать, объявляя поток для каждого типа частоты.
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare("5-sec-stream", ...);
declarer.declare("10-sec-stream", ...);
}
public void execute(Tuple input) {
if (frequencyIsFive(input)) {
collector.emit("5-sec-stream", new Values(input.getValues()))
}
//more cases here
}
Затем, когда объявляете свою топологию, вы делаете
topologyBuilder.setBolt("splitter", new SplitterBolt())
.shuffleGrouping("spout")
topologyBuilder.setBolt("5-second-window", new YourWindowingBolt())
.globalGrouping("splitter", "5-sec-stream")
, чтобы заставить все 5-секундные кортежи перейти к 5второй оконный болт.
См. https://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html для получения дополнительной информации об этом, в частности, о потоках и группировках.
Простой пример топологии окон в https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java.
Одна вещь, о которой вы, возможно, захотите знать, - это время ожидания кортежа Storm.Если вам нужно окно, например, 10 минут, вам нужно значительно увеличить время ожидания кортежа по умолчанию, равное 30 секундам, чтобы кортежи не останавливались во время ожидания в очереди.Вы можете сделать это, установив, например, conf.setMessageTimeoutSecs(15*60)
при настройке топологии.Вы хотите, чтобы между интервалами окон и тайм-аутом кортежа была небольшая задержка, потому что вы хотите избежать как можно большего времени ожидания кортежей.