Spring Kafka и, следовательно, Spring Cloud Stream , позволяют нам создавать транзакционных производителей и процессоров.Мы видим эту функциональность в действии в одном из примеров проектов: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
if(shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
} else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
В этом отрывке есть чтение из темы Кафки, запись в базу данных и другая запись вЕще одна тема Кафки , все это транзакционно.
Что мне интересно, и я бы хотел ответить, как это технически достигнуто и реализовано.
Поскольку источник данных и Кафка неt участвовать в транзакции XA (двухфазная фиксация), как реализация гарантирует, что локальная транзакция может читать из Kafka, фиксировать в базу данных и записывать в Kafka все эти транзакции?