Никогда не будет хорошим решением перезапустить рабочих, вы можете потерять несколько кортежей.Лучше всего использовать функциональность надежности сообщений Storm, как ответил Рахим.
Однако, кроме шторма надежности сообщений, имеется механизм внутреннего обратного давления, это означает, что, когда носики вводят больше данных, чем болтыв состоянии обработки, носик будет автоматически замедляться.
Чтобы включить это, вам нужно сначала, как сказал Рахим, включить активацию.Это означает, что если ваша топология проста:
Излив -> Болт
Излив будет:
public void nextTuple(){
...
_collector.emit(new Values(tuple), tupleId);
}
@Override
publci void ack(Object msgId) { super.ack(msgId); }
Где tupleId просто будет инкрементным счетчиком count++
.Таким образом, вы объявляете Storm новый кортеж, ожидающий подтверждения.
Тем временем в последовательном болте и во всех последовательных болтах в топологии или, по крайней мере, до того, который вызовет узкое место, вы напишите:
public void execute(Tuple tuple){
...
_collector.emit(tuple, new Values(newTuple));
_collector.ack(tuple);
}
Таким образом, вы заметите Storm, что кортеж полностью обработан.
В вашем основном методе, где вы объявляете построителя топологии, вы, как минимум, не последнийнеобходимо определить максимальное количество кортежей, которое будет ожидать Spout:
Config conf = new Config();
conf.setMaxSpoutPending(100);
Таким образом, Spout начнет создавать новые кортежи, ожидающие их подтверждения, если (в этом случае) число ожидающихесли количество кортежей превышает 100, то spout перестанет вызывать метод nextTuple, ожидая их подтверждения и затем генерируя новые.
NB: значение 100 является лишь примером, вам, возможно, придется немного его настроитьчтобы оптимизировать его под вашу ситуацию.
Ссылок, которыми поделился Рахим, должно быть достаточно, чтобы понять механизм, во всяком случае, если вы хотите углубиться в тВ реализацию я добавляю эту ссылку: