Каким образом Spring Kafka / Spring Cloud Stream гарантируют транзакционность / атомарность в отношении базы данных и Kafka? - PullRequest
0 голосов
/ 26 апреля 2019

Spring Kafka и, следовательно, Spring Cloud Stream , позволяют нам создавать транзакционных производителей и процессоров.Мы видим эту функциональность в действии в одном из примеров проектов: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) {
        logger.info("Received event={}", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) {
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
        } else {
            //We fail every other request as a test
            shouldFail.set(true);
        }
        logger.info("Saving person={}", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event={}", event);
        return event;
    }

В этом отрывке есть чтение из темы Кафки, запись в базу данных и другая запись вЕще одна тема Кафки , все это транзакционно.

Что мне интересно, и я бы хотел ответить, как это технически достигнуто и реализовано.

Поскольку источник данных и Кафка неt участвовать в транзакции XA (двухфазная фиксация), как реализация гарантирует, что локальная транзакция может читать из Kafka, фиксировать в базу данных и записывать в Kafka все эти транзакции?

1 Ответ

1 голос
/ 26 апреля 2019

Гарантии нет, только внутри самой Кафки.

Spring предоставляет транзакцию синхронизация , поэтому коммиты находятся близко друг к другу, но БД может фиксировать, а Кафка не делает,Таким образом, вам приходится иметь дело с возможностью дублирования.

Правильный способ сделать это при непосредственном использовании spring-kafka - НЕ с @Transactional, а с использованием ChainedKafkaTransactionManager в контейнере слушателя.

См. Синхронизация транзакций .

См. Также Распределенные транзакции в Spring с и без XA и «Шаблон Best Ehaps 1PC» для фона.

Однако в Stream нет поддержки менеджера цепочек транзакций, поэтому требуется @Transactional (с менеджером транзакций БД).Это даст результаты, аналогичные цепочечному tx-менеджеру, причем сначала выполняется БД, непосредственно перед Kafka.

...