ProducerFencedException после ребалансировки группы потребителей кафки - PullRequest
0 голосов
/ 21 апреля 2020

Я не могу комментировать похожие topi c: Префикс TransactionId для записи только для производителя и чтения-процесса-записи - ProducerFencedException , поэтому я задам новый вопрос.

Вариант использования:

  • Одна топи c с 2 разделами
  • Spring @KafkaListener с параллелизмом = 1 (по умолчанию) в группе потребителей "sample-consumer-group"
  • Два экземпляра одного и того же приложения - оба с одинаковым «префиксом-идентификатора транзакции»

1) Я запускаю первый экземпляр приложения (давайте назовем его «instance1» - даже если все в порядке - один потребитель подписывается на оба раздела. Журнал:

o.s.k.l.KafkaMessageListenerContainer : sample-consumer-group: partitions assigned: [sampleTopic-1, sampleTopic-0]

2) Я запускаю второй экземпляр приложения (instance2) - все выглядит нормально - журнал из этого экземпляра:

o.s.k.l.KafkaMessageListenerContainer : sample-consumer-group: partitions assigned: [sampleTopic-1]

log из "instance1":

o.s.k.l.KafkaMessageListenerContainer    : sample-consumer-group: partitions revoked: [sampleTopic-1, sampleTopic-0]
o.s.k.l.KafkaMessageListenerContainer    : sample-consumer-group: partitions assigned: [sampleTopic-0]

Все еще кажется нормальным ... Но, когда я тогда пытаюсь отправить сообщение на любой другой topi c (не из kafkaListener, а из некоторый метод @Transactional - так что это только транзакция производителя) следующие ошибки oc cur:

ERROR 4395 --- [roducer-tx-prefix-0] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-tx-prefix-0, transactionalId=tx-prefix-0] Aborting producer batches due to fatal error

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

ERROR 4395 --- [roducer-tx-prefix-0] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='sync-register' and payload='2020-04-21T13:52:12.148412Z' to topic anotherTopic

Так это связано с проблемой, что у меня должен быть отдельный префикс-идентификатора транзакции для каждого экземпляра для транзакций только производителя? Если да, каков текущий статус этого и как этого добиться, не используя отдельный kafkaTemplate для транзакций, запущенных потребителем и запущенных производителем?

1 Ответ

1 голос
/ 21 апреля 2020

См. документацию .

. Как уже упоминалось в обзоре, фабрика производителей настраивается с помощью этого свойства для создания свойства транзакции provider.id. При указании этого свойства существует некоторая дихотомия, заключающаяся в том, что при запуске нескольких экземпляров приложения оно должно быть одинаковым во всех экземплярах для удовлетворения ограждающих зомби (также упоминается в обзоре) при создании записей в потоке контейнера слушателя. Однако при создании записей с использованием транзакций, которые не запускаются контейнером слушателя, префикс должен быть разным для каждого экземпляра. Версия 2.3 упрощает настройку, особенно в приложении Spring Boot. В предыдущих версиях вам приходилось создавать две фабрики производителей и KafkaTemplate s - одну для создания записей в потоке контейнера слушателя, а другую для автономных транзакций, запускаемых kafkaTemplate.executeInTransaction () или перехватчиком транзакций в методе @Transactional.

Теперь вы можете переопределить фабричный транзакционныйIdPrefix на KafkaTemplate и KafkaTransactionManager.

При использовании диспетчера транзакций и шаблона для контейнера слушателя вы обычно оставляете это значение по умолчанию для свойства фабрики производителя. , Это значение должно быть одинаковым для всех экземпляров приложения. Для транзакций, запускаемых шаблоном (или диспетчером транзакций для @Transaction), вы должны установить свойство в шаблоне и диспетчере транзакций соответственно. Это свойство должно иметь разные значения для каждого экземпляра приложения.

...