Групповое отправка кафки сообщения и обновление БД в одной транзакции в SpringBoot - PullRequest
1 голос
/ 20 октября 2019

Мне нужно выполнить несколько операций за одну транзакцию

  • выдать сообщение kafka
  • update Таблица A
  • update Таблица B

Я в порядке с отправкой сообщения и не обновляю обе таблицы (A и B). Я не в порядке, чтобы создать сообщение и обновить одну из таблиц.

Я пытаюсь достичь своей цели с помощью @Transactional аннотации


import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

 @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
    public void handle(Event approvalEvent) {
        var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

        entityService.approve(entity.getTransactionId());
        logService.logApproval(entity);
        producer.send(approvalEvent);
    }

правильно ли я это делаю?

1 Ответ

1 голос
/ 21 октября 2019

Проблема с описанным выше подходом заключается в том, что вы взаимодействуете с двумя разными системами (база данных и очередь сообщений) в одной транзакции. Комбинации сценариев, которые нужно обрабатывать, когда работа в одной системе успешна и в другой системе происходит сбой, делает решение сложным.

В мире микросервисов существует схема, позволяющая обрабатывать точно такой же сценарий. Это называется исходящим шаблоном.

Подробнее об этом можно прочитать здесь .

Краткое резюме: в вашей базе данных есть дополнительная таблица с именем outbox, которая содержит сообщения, которые должны быть опубликованыочередь сообщений.

В транзакции БД для добавления \ обновления сущности вы вставляете строку в инструмент таблицы исходящих сообщений, содержащий сведения об операции над сущностью.

Затем вы асинхронно читаете строкииз исходящей таблицы и публикации в очередь сообщений с помощью опроса или с помощью сбора данных изменений. Смотрите пример реализации здесь с использованием дебезия.

Ваш код транзакции будет выглядеть следующим образом.

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
    var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

    entityService.approve(entity.getTransactionId());
    logService.logApproval(entity);
    //Outbox is the table containing the records to be published to MQ 
    outboxRepo.save(approvalEvent);
}
...