Мы используем Spring Cloud Stream для Kafka и ищем семантику «Точно однажды» с потребительским API - PullRequest
0 голосов
/ 27 марта 2019

Мы используем Spring Cloud Stream для Kafka и ищем семантику «Однажды». У нас есть одно решение, которое работает нормально, как и ожидалось 1) Включение Idempotent & Transaction от производителя 2) Использование MetaDataStore для проверки дубликата сообщения со стороны потребителя с ключом (offsetId + partitionId + topicName) С вышеупомянутым решением у нас не будет никакой потери сообщения и никакой дублирующейся обработки

Но теперь мы обнаружили, что есть одно свойство (producer.sendOffsetsToTransaction) Kafka API, которое помогает нам исправлять обработку дубликатов со стороны потребителя без какой-либо логики метаданных хранилища. Теперь я не уверен, как мы можем сделать это с весенним облачным потоком с этим свойством .sendOffsetsToTransaction

1 Ответ

0 голосов
/ 27 марта 2019

Он обрабатывается платформой автоматически, если вы добавляете KafkaTransactionManager в контекст приложения.

Вам необходимо добавить префикс идентификатора транзакции в конфигурацию.

spring.kafka.producer.transaction-id-prefix

и Boot автоматически добавит менеджер транзакций.

См. Свойства производителя .

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

Включает транзакции всвязующее.См. Transaction.id в документации Kafka и Транзакции в документации spring-kafka.Когда транзакции включены, отдельные свойства производителя игнорируются, и все производители используют spring.cloud.stream.kafka.binder.transaction.producer. * Properties.

Контейнер слушателя отправляет смещение в транзакциюперед совершением транзакции, когда слушатель нормально выходит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...