Хорошая практика при использовании кафки с jpa - PullRequest
0 голосов
/ 27 июня 2018

Я сейчас нахожусь в проекте, где используются JPA и Kafka. Я пытаюсь найти набор хорошей практики для объединения этих операций.

В существующем коде производитель используется в той же транзакции, что и jpa, однако из того, что я прочитал, кажется, что они не разделяют транзакцию.

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;
}

где производитель кафки определяется следующим образом:

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) {
        template.send("event", new Event(dto, eventType));
    }
}

Является ли это допустимым вариантом использования для объединения jpa и kafka, правильно ли определены границы транзакций?

Ответы [ 4 ]

0 голосов
/ 23 февраля 2019

Как уже говорили другие, вы можете использовать сбор данных изменений для безопасного распространения изменений, примененных к вашей базе данных, на Apache Kafka. Вы не можете обновить базу данных и Kafka за одну транзакцию, так как последняя не поддерживает какой-либо протокол двухфазной фиксации.

Вы можете либо CDC сами таблицы, либо, если вы хотите иметь больше контроля над структурой, отправленной в Kafka, применить шаблон «исходящие». В этом случае ваше приложение будет записывать в свои фактические бизнес-таблицы, а также в таблицу «исходящих», которая содержит сообщения для отправки в Kafka. Вы можете найти подробное описание этого подхода в этом блоге .

Отказ от ответственности: я являюсь автором этого поста и руководителем Debezium, одного из решений CDC, упомянутых в некоторых других ответах.

0 голосов
/ 01 июля 2018

Вы не должны помещать отправляющее сообщение в kafka в транзакции. Если вам нужна логика, когда не удается отправить событие на kafka, а затем отменить транзакцию, в этом случае будет лучше использовать spring-retry. Просто поместите код, связанный с отправкой события в kafka, в аннотированный метод @Retryable, а также добавьте аннотированный метод @Recover с логикой возврата изменений, сделанных ранее в БД.

0 голосов
/ 06 июля 2018

Рассматривая ваш вопрос, я предполагаю, что вы пытаетесь добиться CDC (Change Data Capture) вашей OLTP-системы, то есть регистрируете каждое изменение, которое поступает в транзакционную базу данных. Есть два способа приблизиться к этому.

  1. Код приложения выполняет двойную запись в транзакционную БД и Кафку. Это противоречиво и снижает производительность. Несогласованно, потому что, когда вы выполняете двойную запись в две независимые системы, данные облажаются при сбое любой из записей, а отправка данных в Kafka в потоке транзакций добавляет задержку, с которой вы не хотите идти на компромисс.
  2. Извлечь изменения из фиксации БД (триггеры уровня базы данных / приложения или журнал транзакций) и отправить их в Kafka. Он очень последовательный и никак не влияет на вашу транзакцию. Согласованно, потому что журналы фиксации БД являются отражением транзакций БД после успешной фиксации. Существует множество решений, которые используют этот подход, таких как база данных , maxwell , дебезиум и т. Д.

Если CDC - ваш вариант использования, попробуйте использовать любое из уже доступных решений.

0 голосов
/ 27 июня 2018

это не будет работать так, как задумано, когда транзакция завершится неудачно. Взаимодействие с kafka не является частью транзакции.

Возможно, вы захотите взглянуть на TransactionalEventListener Возможно, вы захотите написать сообщение kafka в событии AFTER_COMMIT. даже тогда публикация кафки может закончиться неудачей.

Другим вариантом является запись в БД с использованием jpa, как вы делаете. Пусть дебезиум прочитает обновленные данные из вашей базы данных и отправит их в kafka. Мероприятие будет проходить в другом формате, но гораздо богаче.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...