У меня есть слушатель kafka, настроенный в нашем приложении Spring Boot следующим образом:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
Это приложение развертывается и запускается на 3 серверах, и, несмотря на то, что все они имеют идентификатор группы kms
, все они получают копию сообщения, что означает, что я получаю 3 идентичные записи в Elastic. Когда я запускаю экземпляр локально, пишутся 4 копии.
Я подтвердил, что продюсер пишет только 1 сообщение в тему, проверяя количество всех сообщений в теме до и после записи; оно увеличивается только на 1. Как я могу предотвратить это?