Весенняя кафка и ровно один раз гарантия доставки - PullRequest
0 голосов
/ 29 сентября 2018

Я использую Spring Kafka и Spring Boot и просто задаюсь вопросом, как настроить моего потребителя, например:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

, чтобы использовать точно однократную гарантию доставки?

Должен ли я только добавить org.springframework.transaction.annotation.Transactional аннотации по sendPost методу и это все или мне нужно выполнить несколько дополнительных шагов для достижения этой цели?

ОБНОВЛЕНО

Это мой текущий конфиг

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTransactionManager<Object, Object> transactionManager) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }


    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Post> postProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Post> postKafkaTemplate() {
        return new KafkaTemplate<>(postProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Message> messageProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Message> messageKafkaTemplate() {
        return new KafkaTemplate<>(messageProducerFactory());
    }

, но происходит сбой со следующей ошибкой:

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of method kafkaTransactionManager in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration required a single bean, but 3 were found:
    - postProducerFactory: defined by method 'postProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - updateProducerFactory: defined by method 'updateProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - messageProducerFactory: defined by method 'messageProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]

Что я делаю не так?

1 Ответ

0 голосов
/ 01 октября 2018

Вы не должны использовать ручные подтверждения.Вместо этого вставьте KafkaTransactionManager в контейнер слушателя, и контейнер отправит смещение в транзакцию, когда метод слушателя завершится нормально (или откат в противном случае).

Вы не должны делать acks через потребителя ровно один раз.

РЕДАКТИРОВАТЬ

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        isolation:
          level: read_committed
    producer:
      transaction-id-prefix: myTrans.

Приложение

@SpringBootApplication
public class So52570118Application {

    public static void main(String[] args) {
        SpringApplication.run(So52570118Application.class, args);
    }

    @Bean // override boot's auto-config to add txm
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTransactionManager<Object, Object> transactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        return factory;
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(id = "so52570118", topics = "so52570118")
    public void listen(String in) throws Exception {
        System.out.println(in);
        Thread.sleep(5_000);
        this.template.send("so52570118out", in.toUpperCase());
        System.out.println("sent");
    }

    @KafkaListener(id = "so52570118out", topics = "so52570118out")
    public void listenOut(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> this.template.executeInTransaction(t -> t.send("so52570118", "foo"));
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so52570118", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so52570118out", 1, (short) 1);
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...