Я пытаюсь добиться ровно однократной обработки каждого сообщения на kafka topi c. Вот моя конфигурация:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);
Я установил Acknolwedgement Mode
как РУЧНОЙ и параллелизм на 2.
Тем не менее он принимает сообщения более одного раза. Кто-нибудь сталкивался с этой проблемой. Кроме того, в описанной выше конфигурации потребитель всегда получает только одно сообщение в одном пакете. Я попытался увеличить fetch.min.bytes
и fetch.max.wait.ms
, но это никак не отразилось.
Проблема с пакетной конфигурацией решена после внесения изменений в ConcurrentKafkaListenerContainerFactory следующим образом:
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);
factory.getContainerProperties () setAckMode (org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL). factory.setMessageConverter (новый BatchMessagingMessageConverter (stringJsonMessageConverter ()));