Как задано в вопросе Как вручную управлять фиксацией смещения с помощью camel-kafka? Я хочу фиксировать смещения вручную с помощью camel-kafka. Мой маршрут:
.from(kafka:topic1)
.aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
.process(new ManualCommitProcessor())
, где ManualCommitProcessor
выполнит обязательство после отправки сообщения в другую тему.
Проблема в том, что агрегатор и производитель кафки работают в отдельных потоках для потребителя кафки, который отвечает за смещение обязательств. Следовательно, я заканчиваю в
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
Существует ли возможность повторного вызова потока потребителя после агрегации и отправки для фиксации смещения?