Я использую соединитель Alpakka kafka для приема пакетов от kafka. Я использую Consumer в качестве CommittableSource. Я хотел бы создать несколько потребительских потоков на одной машине и использовать их в качестве единого источника. Как я могу этого достичь?
В настоящее время я создал несколько источников, используя Consumer.CommittableSource, и объединяю все источники в один источник, используя функцию «слияние». Но я не уверен, что это правильный подход, так как я не создаю темы.
Ниже приведен исходный код, который я сейчас использую:
public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
}
return finalSource;
}