Я работаю в среде разработки с 3 (докеризованными) брокерами kafka в моей системе.У брокеров для транзакции.state.log.replication.factor установлено значение 3.
В конфигурации потокового приложения я устанавливаю processing.guarantee как EXACTLY_ONCE, а в конфигурации потребительского приложения я устанавливаю изоляционный уровень как "read_committed".
Я проверил другие конфигурации на https://docs.confluent.io/current/streams/developer-guide/config-streams.html#processing-guarantee и настроил свою среду в соответствии с руководством.
После минуты производства сообщения из потокового приложения, которое считывает хранилище состояний ивыдает сообщение 100 с помощью функции context.forward (..), приложение-потребитель прекращает чтение, как будто в назначенных разделах нет зафиксированных сообщений.
Через некоторое время потоковое приложение вылетает со следующей ошибкой:
"Прерывание пакетов производителя из-за фатальной ошибки org.apache.kafka.common.errors.ProducerFencedException: Производитель попытался выполнить операцию со старой эпохой. Либо есть более новый производитель с тем же транзакционным ID,или транзакция производителя истекла у брокера. "
Похоже, что производитель потока не может получить подтверждение, и транзакция истекает.
Редактировать 1: Когда я останавливаю потоковое приложение, потребитель получает подтвержденные сообщения.