Kafka Streams не будет фиксировать смещения для этого случая, так как он обеспечивает как минимум однократные гарантии обработки (на самом деле, даже невозможно перенастроить Kafka Streams по-другому - возможны только более сильные гарантии ровно за один раз). Кроме того, Kafka Streams всегда отключает автоматическую фиксацию на потребителе (и не позволяет вам ее включить), поскольку Kafka Streams сама управляет смещением коммитов.
Если вы запускаете с настройками по умолчанию, производитель должен фактически сгенерировать исключение, и соответствующий поток должен умереть - вы можете получить обратный вызов, если поток умирает, зарегистрировав KafkaStreams#uncaughtExceptionHandler()
.
Вы также можете наблюдать KafkaStreams#state()
(или зарегистрировать обратный вызов KafkaStreams#setStateListener()
). Состояние перейдет к DEAD
, если все потоки не работают (обратите внимание, в старой версии была ошибка, для которой состояние все еще было RUNNING
для этого случая: https://issues.apache.org/jira/browse/KAFKA-5372)
Следовательно, приложение не должно находиться в исправном состоянии, и Kafka Streams не будет повторять вводимое сообщение, но останавливает обработку, и вам потребуется перезапустить клиент. При перезапуске будет перечитано сообщение о неудачном входе и попытка записи в тему вывода.
Если вы хотите, чтобы Kafka Streams повторял попытку, вам нужно увеличить конфигурацию производителя reties
, чтобы производитель не выдавал исключение и не повторял внутреннюю запись. Это может «заблокировать» дальнейшую обработку в конечном итоге, если буфер записи производителя заполнится.