Несколько потребителей Spring Kafka - PullRequest
0 голосов
/ 02 ноября 2018

Примечание к дублирующимся маркерам : I DID проверьте другой вопрос, но он не отвечает на мой конкретный вопрос ниже.

Итак, представьте, что у меня есть тема Kafka на одном сервере с одним разделом. Так что это очень похоже на очередь.

Теперь давайте предположим, что я хочу, чтобы 100 слушателей ожидали получения значений из очереди. Таким образом, по замыслу, если все 100 потребителей находятся в одной группе, содержимое журнала (или очереди здесь) будет распределено среди потребителей. Таким образом, операция будет закончена в 1/100 времени.

Проблема заключается в том, что прослушиватель Spring Kafka настраивается только с именем темы.

@Service
public class Consumer {

    @KafkaListener(topics = "${app.topic}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        System.out.println("Received message="+message);
        headers.keySet().forEach(key -> System.out.println(key+"->"+headers.get(key)));
    }
}

Может показаться, что Кафка породил 100 потребителей для обработки сообщений из «очереди» (журналов). Как это можно сделать?

1 Ответ

0 голосов
/ 02 ноября 2018

Проверьте этот ответ для понимания потребителей Kafka В Apache Kafka почему не может быть больше экземпляров потребителей, чем разделов?

Чтобы правильно распределить сообщения среди одной группы потребителей, у вас должно быть более одного раздела. Как только вы найдете правильное количество разделов для вашей нагрузки, я использую потоковую передачу по облаку Spring, чтобы лучше управлять вашим параллелизмом и назначением групп потребителей.

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

Образец раковины

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void handle(Person person) {
    System.out.println("Received: " + person);
}

public static class Person {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}
}

Настройки параллелизма

cloud:
  stream:
    bindings:
      input:
        destination: <topic-name>
        group: <consumer-group>
        consumer:
          headerMode: raw
          partitioned: true
          concurrency: 20

Более подробная информация доступна здесь https://cloud.spring.io/spring-cloud-stream/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...