Можем ли мы добиться ровно одной обработки сообщения, используя пакетный прослушиватель Spring Kafka? - PullRequest
0 голосов
/ 13 апреля 2020

Я пытаюсь добиться ровно однократной обработки каждого сообщения на kafka topi c. Вот моя конфигурация:

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);  
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);

Я установил Acknolwedgement Mode как РУЧНОЙ и параллелизм на 2.

Тем не менее он принимает сообщения более одного раза. Кто-нибудь сталкивался с этой проблемой. Кроме того, в описанной выше конфигурации потребитель всегда получает только одно сообщение в одном пакете. Я попытался увеличить fetch.min.bytes и fetch.max.wait.ms, но это никак не отразилось.

Проблема с пакетной конфигурацией решена после внесения изменений в ConcurrentKafkaListenerContainerFactory следующим образом:

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);

factory.getContainerProperties () setAckMode (org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL). factory.setMessageConverter (новый BatchMessagingMessageConverter (stringJsonMessageConverter ()));

Ответы [ 2 ]

1 голос
/ 13 апреля 2020

Чтобы получить ровно одну семантику, вы должны использовать транзакции .

Однако ровно однажды семантика применяется только к

read from Kafka -> process -> write to Kafka

и даже тогда, только применяется ко всему потоку (чтение / обработка / запись).

Одни только шаги чтения и обработки могут быть вызваны несколько раз для одной и той же записи (если процесс или запись завершаются неудачно). Единственная гарантия состоит в том, что все это будет обработано только один раз.

0 голосов
/ 16 апреля 2020

Кафка предлагает по крайней мере один раз обработку по умолчанию. Таким образом, реализация идемпотентности у вашего потребителя также может быть эффективной ровно один раз. Допустим, ваша система выглядит как

producer -> topic1 -> consumer1 -> topic2 -> consumer2 -> topic3 -> consumer3

Допустим, окончательная обработка выполняется в потребителе3. Тогда, даже если промежуточные потребители обрабатывают сообщение несколько раз, добавление идемпотентности в потребителя3 гарантирует, что каждое сообщение обрабатывается ровно один раз.

Однако это предполагает, что нормально обрабатывать одно и то же сообщение несколько раз в промежуточных потребителях. , Этот подход может быть проще в более простой системе с небольшим количеством потребителей / тем. Если число потребителей увеличивается, добавление проверок идемпотентности у каждого потребителя может стать громоздким.

...