Фиксация на основе событий в транзакции KafkaTemplate с использованием KafkaTransactionManager - PullRequest
0 голосов
/ 14 сентября 2018

Spring управляемый KafkaTemplate предоставляет

template.send(record).addCallback(...
template.executeInTransaction(...

Теперь, допустим, у меня есть метод doWork (), который запускается в событии (скажем, в сообщении TCP / IP).

@Autowired
KafkaTemplate template;

// This method is triggered on a event
doWork(EventType event){
    switch(event){
        case Events.Type1 :
            template.send(record); break;
        case Events.Type2 :
            // Question : How do I achieve a commit of all my previous sends here?
         default : break;
    }
}

По сути, мне нужно выполнить транзакцию, добавив @Transaction поверх doWork () или

template.executeInTransaction(...

в коде.Но я хочу собрать пару [template.send ()] и выполнить коммит после нескольких вызовов метода doWork (), как мне этого добиться?

В моих конфигурациях производителя включены транзакциии KafkaTransactionManager, подключенный к фабрике производителя.

1 Ответ

0 голосов
/ 14 сентября 2018
kafkaTemplate.executeInTransaction(t -> {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

Пока метод doWork() использует один и тот же шаблон и выполняется в рамках обратного вызова, работа будет выполняться в транзакции.

Или

@Transactional
public void doIt() {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

При использовании декларативных транзакций.

Если события TCP асинхронные, вам нужно каким-то образом передать их потоку, выполняющему транзакцию, например, используя BlockingQueue<?>.

...