Я создаю потребителя Kafka с использованием фреймворка Quarkus, который будет читать топи c с 3 разделами. Приведенный ниже фрагмент кода работает, но на основе журналов я просто инициирую 1 потребителя с 3 разделами. Моя проблема теперь в том, как создать 3 потребителя после запуска приложения.
@Incoming("topic-1")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) throws IOException {
LOG.info("Kafka order message with value = {} arrived from topic {} ", message.getPayload(),
message.getTopic());
//JsonObject event = new JsonObject(message.getPayload());
try {
if (true) {
LOG.info("Kafka message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
return message.ack();
}
См. Образцы журналов:
INFO [org.apa.kaf.cli.con .int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId = testconsumer, groupId = kafka-detection-consumer] Завершенное назначение для группы в поколении 64: {testconsumer-bf6d314 c -44e1- 47b1-9439-fe4058951841 = Назначение (разделы = [test_part-0, test_part-1, test_part-2])}
![enter image description here](https://i.stack.imgur.com/LdlRb.png)