Как зафиксировать смещения многопоточности с помощью camel-kafka? - PullRequest
0 голосов
/ 22 июня 2019

Как задано в вопросе Как вручную управлять фиксацией смещения с помощью camel-kafka? Я хочу фиксировать смещения вручную с помощью camel-kafka. Мой маршрут:

.from(kafka:topic1)
 .aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
 .process(new ManualCommitProcessor())

, где ManualCommitProcessor выполнит обязательство после отправки сообщения в другую тему.

Проблема в том, что агрегатор и производитель кафки работают в отдельных потоках для потребителя кафки, который отвечает за смещение обязательств. Следовательно, я заканчиваю в

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

Существует ли возможность повторного вызова потока потребителя после агрегации и отправки для фиксации смещения?

1 Ответ

0 голосов
/ 23 июня 2019

Нет, это невозможно, потребительский поток работает независимо от выхода агрегатора.

...