Вместо того, чтобы зацикливать три темы в одном методе, вы можете создать скелетный поток, подобный такому, который будет использовать любую тему. См. Примеры здесь
Я не могу сказать, если это "решит" проблему, но попытка использовать темы с разными схемами в одном приложении обычно не является масштабируемым шаблоном, но этоне очень понятно, что ты пытаешься сделать.
class ConsumerThread extends Thread {
KafkaConsumer consumer;
AtomicBoolean stopped = new AtomicBoolean();
ConsumerThread(Properties props, String subscribePattern) {
this.consumer = new KafkaConsumer...
this.consumer.subscribe(subscribePattern);
}
@Override
public void run() {
while (!this.stopped.get()) {
... records = this.consumer.poll(100);
for ( ... each record ... ) {
// Process record
}
}
}
public void stop() {
this.stopped.set(true);
}
}
Не предназначено для производственного класса
Затем независимо от трех потребителей.
new ConsumerThread("t1").start();
new ConsumerThread("t2").start();
new ConsumerThread("t3").start();
Примечание : KafkaConsumer
не является поточно-ориентированным.