Обновление состояния в цепочке потоков Kafka без использования потоков Kafka способом EOS - PullRequest
0 голосов
/ 03 мая 2020

В настоящее время я работаю над развертыванием цепочки процессов распределенного потока с использованием Kafka, но не библиотеки потоков Kafka. Я создал вид узла, который может быть выполнен и принимать в качестве входных данных топи c, обрабатывать полученные данные и отправлять их в выходные топи c. Узел представляет собой простую пару потребитель / производитель, которая связана с уникальным разделом восходящего потока. Производитель идемпотентен, обработка выполняется в контексте транзакции, например:

producer.initTransaction();
try
{
        producer.beginTransaction();

        //process

        producer.commitTransaction();
}
catch (KafkaException e)
{
        producer.abortTransaction();
}

Я также использовал метод producer.sendoffsetstotransaction, чтобы обеспечить фиксацию atomi c для потребителя. Я хотел бы использовать хранилище значений ключей для сохранения состояния моих узлов (я думал о MapDB, который выглядит простым в использовании).

Но мне интересно, если я обновлю свое состояние внутри транзакции, например, map.put(key, value), обеспечит ли транзакция, что состояние будет обновлено ровно один раз?

Большое спасибо

...