Ситуация довольно проста - у меня есть два kafka
слушателя из двух разных тем, одна из которых рассматривается как тема данных, а вторая как тема команды, когда я получаю запрос от темы команды, в которую я хочу вернутьсявремя для темы данных до некоторой даты:
//get reference to consumer for data-topic
protected Consumer<?, ?> consumer = ...
//when event from command-topic received i want to shift back in time
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(interest);
К сожалению, я сталкиваюсь с ошибкой:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1808) ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1792) ~[kafka-clients-1.0.2.jar:na]
Я считаю, что одновременно слушатель данных обрабатывает записи и вызывает из другогоПоток offsetsForTimes нарушает правило однопоточности, но что, по вашему мнению, является лучшим способом решения этой проблемы?