Я не знаю насчет streamparse, но у меня сложилось впечатление, что вы хотите связать кортежи и записать их как пакет.Допустим, вы записали до смещения 10. Теперь ваш болт получает смещение 11-15, и партия не может записать.Смещение 15-20 ставится в очередь, и вы не хотите обрабатывать их прямо сейчас, потому что это будет обрабатывать партии не по порядку.
Правильно ли это понимание?
Во-первых, я бы отбросил вручнуюсовершать смещения.Вы должны позволить носику справиться с этим.Предполагая, что вы используете storm-kafka-client
, вы можете настроить его на фиксацию смещений только после того, как соответствующий кортеж и все предыдущие кортежи были подтверждены.
Что вам, вероятно, следует сделать, это следить за болтом (или даже лучше,в вашей базе данных) какое наибольшее смещение было в неудачном пакете.Затем, когда ваш болт не может записать смещение 11-15, вы можете заставить болт выходить из строя каждый кортеж с помощью offset > 15
.В какой-то момент вы снова получите смещение 11-15 и сможете повторить запись пакета.Поскольку все сообщения с ошибкой offset > 15
были неудачными, они также будут повторены и будут доставлены после сообщений в неудачном пакете.
Это решение предполагает, что вы не выполняете переупорядочение потока сообщений между носиком и болтом писателя, поэтому сообщения поступают на болт в порядке их отправки.