Синхронизация транзакций между базой данных и производителем Kafka - PullRequest
0 голосов
/ 06 сентября 2018

У нас есть архитектура микроуслуг, где Кафка используется в качестве механизма связи между сервисами.Некоторые из сервисов имеют свои собственные базы данных.Скажем, пользователь звонит в службу A, что должно привести к созданию записи (или набора записей) в базе данных этой службы.Кроме того, об этом событии следует сообщать другим службам в качестве элемента темы Кафки.Каков наилучший способ гарантировать, что записи базы данных записываются только в том случае, если тема Kafka успешно обновлена ​​(по существу, создается распределенная транзакция вокруг обновления базы данных и обновления Kafka)?

Мы думаем оиспользуя spring-kafka (в сервисе Spring Boot WebFlux), и я вижу, что он имеет KafkaTransactionManager , но, насколько я понимаю, это больше касается самих транзакций Kafka (обеспечение согласованностимежду производителями и потребителями Kafka), вместо того, чтобы синхронизировать транзакции между двумя системами (см. здесь : «Kafka не поддерживает XA, и вам приходится иметь дело с возможностью того, что DB tx может зафиксировать, в то время как Kafka txоткатывается. »).Кроме того, я думаю, что этот класс опирается на среду транзакций Spring, которая, по крайней мере, насколько я понимаю, привязана к потокам, и не будет работать, если использовать реактивный подход (например, WebFlux), когда различные части операции могут выполнятьсяразные темы.(Мы используем реагирующий-pg-клиент , поэтому обрабатываем транзакции вручную, а не с помощью среды Spring.)

Некоторые варианты, которые я могу придумать:

  1. Не записывайте данные в базу данных: только записывайте их в Kafka.Затем используйте потребителя (в службе A) для обновления базы данных.Кажется, что это может быть не самым эффективным, и возникнут проблемы в том, что служба, которую пользователь вызвал, не может сразу увидеть изменения базы данных, которые он должен был только что создать.
  2. Не писать напрямую в Kafka: writeтолько в базу данных, и используйте что-то вроде Дебезиум , чтобы сообщить об изменениях Кафке.Проблема здесь заключается в том, что изменения основаны на отдельных записях базы данных, в то время как значимое для бизнеса событие для хранения в Kafka может включать комбинацию данных из нескольких таблиц.
  3. Сначала выполнить запись в базу данных (в случае сбоя выполнитеничего и просто выкинуть исключение).Затем при записи в Kafka предположим, что запись может быть неудачной.Используйте встроенную функцию автоматической повторной попытки, чтобы попытаться продолжить работу некоторое время.Если это в конечном итоге полностью не удается, попробуйте написать в очередь недоставленных писем и создать какой-то ручной механизм для администраторов, чтобы разобраться с этим.И если запись в DLQ завершается неудачно (т. Е. Кафка полностью отключена), просто зарегистрируйте ее другим способом (например, в базу данных) и снова создайте какой-то ручной механизм для администраторов, чтобы разобраться в этом.

Кто-нибудь получил какие-либо мысли или советы по поводу вышеизложенного или смог исправить любые ошибки в моих предположениях выше?

Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 29 декабря 2018

Прежде всего, я должен сказать, что я не Кафка и не эксперт по Spring, но я думаю, что это более концептуальная задача при записи в независимые ресурсы, и решение должно быть адаптировано к вашему технологическому стеку. Кроме того, я должен сказать, что это решение пытается решить проблему без внешнего компонента, такого как Debezium, потому что, по моему мнению, каждый дополнительный компонент создает проблемы при тестировании, обслуживании и запуске приложения, которые часто недооцениваются при выборе такого варианта. Также не каждая база данных может быть использована в качестве источника Debezium.

Чтобы убедиться, что мы говорим об одних и тех же целях, давайте проясним ситуацию на упрощенном примере авиакомпании, где клиенты могут покупать билеты. После успешного заказа клиент получит сообщение (почта, push-уведомление и т. Д.), Отправленное внешней системой обмена сообщениями (системой, с которой мы должны поговорить).

В традиционном мире JMS с XA-транзакцией между нашей базой данных (где мы храним заказы) и JMS-провайдером это будет выглядеть следующим образом: Клиент устанавливает заказ для нашего приложения, где мы запускаем транзакцию. Приложение сохраняет заказ в своей базе данных. Затем сообщение отправляется в JMS, и вы можете совершить транзакцию. Обе операции участвуют в транзакции, даже когда они разговаривают со своими собственными ресурсами. Поскольку транзакция XA гарантирует ACID, мы в порядке.

Давайте добавим в игру Кафку (или любой другой ресурс, который не может участвовать в транзакции XA). Поскольку больше не существует координатора, который синхронизирует обе транзакции, основной идеей следующего является разделение обработки на две части с постоянным состоянием.

Когда вы сохраняете заказ в своей базе данных, вы также можете сохранить сообщение (с агрегированными данными) в той же базе данных (например, как JSON в CLOB-столбце), которую впоследствии вы хотите отправить в Kafka. Тот же ресурс - ACID гарантированно, пока все хорошо. Теперь вам нужен механизм, который опрашивает вашу «KafkaTasks» -Table для новых задач, которые должны быть отправлены в Kafka-Topic (например, с помощью службы таймера, возможно, аннотация @Scheduled может использоваться в Spring). После того, как сообщение было успешно отправлено в Kafka, вы можете удалить запись задачи. Это гарантирует, что сообщение для Kafka отправляется только тогда, когда заказ также успешно сохраняется в базе данных приложения. Получили ли мы те же гарантии, что и при использовании транзакции XA? К сожалению, нет, так как все еще существует вероятность того, что запись в Kafka работает, но удалить задачу не удается. В этом случае механизм повтора (вам понадобится тот, который указан в вашем вопросе) обработает задачу и отправит сообщение дважды. Если ваше экономическое обоснование удовлетворено этой «хотя бы раз» гарантией, вы сделали здесь полусложное решение imho, которое может быть легко реализовано как функциональность фреймворка, так что не всем придется беспокоиться о деталях.

Если вам нужен «ровно один раз», вы не можете сохранить свое состояние в базе данных приложения (в данном случае «удаление задачи» является «состоянием»), но вместо этого вы должны сохранить его в Kafka (при условии, что у вас есть ACID гарантирует между двумя темами Кафки). Пример: допустим, у вас есть 100 задач в таблице (идентификаторы от 1 до 100), и задание обрабатывает первые 10. Вы записываете свои сообщения Kafka в их тему, а другое сообщение с идентификатором 10 - в «вашу тему». Все в той же Кафке-транзакции. В следующем цикле вы потребляете свою тему (значение 10) и принимаете это значение, чтобы получить следующие 10 задач (и удалить уже обработанные задачи).

Если есть более простые (в приложении) решения с такими же гарантиями, я с нетерпением жду вашего ответа!

Извините за длинный ответ, но я надеюсь, что это поможет.

0 голосов
/ 07 сентября 2018

Я бы предложил использовать слегка измененный вариант подхода 2.

Запись только в вашу базу данных, но в дополнение к фактической записи в таблицу, также записывайте «события» в специальную таблицу в той же базе данных; эти записи о событиях будут содержать необходимые вам агрегаты. Проще всего, вы просто вставите другую сущность, например сопоставляется JPA, который содержит свойство JSON с совокупной полезной нагрузкой. Конечно, это может быть автоматизировано с помощью некоторых средств прослушивателя транзакций / компонента фреймворка.

Затем используйте Debezium, чтобы захватить изменения только из этой таблицы и направить их в Kafka. Таким образом, у вас есть и то и другое: в конечном итоге непротиворечивое состояние в Kafka (события в Kafka могут отставать или вы можете увидеть несколько событий во второй раз после перезапуска, но в конечном итоге они будут отражать состояние базы данных) без необходимости в распределенных транзакциях, и семантика событий бизнес-уровня, к которой вы стремитесь.

(Отказ от ответственности: я веду Debezium; как ни странно, я только в процессе написания блога, обсуждающего этот подход более подробно)

Вот посты

https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

...