Сдвигается ли пользовательское смещение, даже если не удается опубликовать тему вывода в Kafka Streams? - PullRequest
1 голос
/ 31 мая 2019

Если у меня есть потоковое приложение Kafka, которое не может опубликовать в теме (поскольку тема не существует), фиксирует ли оно смещение потребителя и продолжает ли оно, или будет зацикливаться на том же сообщении, пока не сможет разрешить выходную тему?Приложение просто печатает ошибку и работает нормально, в отличие от того, что я могу наблюдать.

Пример ошибки при попытке опубликовать в теме:

Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}

Inпо моему мнению, это будет просто вращаться на том же сообщении, пока проблема не будет решена, чтобы не потерять данные?Я не смог найти четкого ответа о поведении по умолчанию.Мы не отключили автокоммитирование или что-то в этом роде, большинство настроек установлены по умолчанию.

Я спрашиваю, поскольку мы не хотим оказаться в ситуации, когда проверка работоспособности в порядке.(приложение работает во время печати ошибок для регистрации), и мы просто выбрасываем тонны сообщений Kafka.

1 Ответ

2 голосов
/ 03 июня 2019

Kafka Streams не будет фиксировать смещения для этого случая, так как он обеспечивает как минимум однократные гарантии обработки (на самом деле, даже невозможно перенастроить Kafka Streams по-другому - возможны только более сильные гарантии ровно за один раз). Кроме того, Kafka Streams всегда отключает автоматическую фиксацию на потребителе (и не позволяет вам ее включить), поскольку Kafka Streams сама управляет смещением коммитов.

Если вы запускаете с настройками по умолчанию, производитель должен фактически сгенерировать исключение, и соответствующий поток должен умереть - вы можете получить обратный вызов, если поток умирает, зарегистрировав KafkaStreams#uncaughtExceptionHandler().

Вы также можете наблюдать KafkaStreams#state() (или зарегистрировать обратный вызов KafkaStreams#setStateListener()). Состояние перейдет к DEAD, если все потоки не работают (обратите внимание, в старой версии была ошибка, для которой состояние все еще было RUNNING для этого случая: https://issues.apache.org/jira/browse/KAFKA-5372)

Следовательно, приложение не должно находиться в исправном состоянии, и Kafka Streams не будет повторять вводимое сообщение, но останавливает обработку, и вам потребуется перезапустить клиент. При перезапуске будет перечитано сообщение о неудачном входе и попытка записи в тему вывода.

Если вы хотите, чтобы Kafka Streams повторял попытку, вам нужно увеличить конфигурацию производителя reties, чтобы производитель не выдавал исключение и не повторял внутреннюю запись. Это может «заблокировать» дальнейшую обработку в конечном итоге, если буфер записи производителя заполнится.

...