Spring-Kafka Concurrency Property - PullRequest
       7

Spring-Kafka Concurrency Property

2 голосов
/ 11 апреля 2019

Я продолжаю писать свой первый потребитель Kafka, используя Spring-Kafka.Посмотрел различные варианты, предоставляемые фреймворком, и мало сомневался в этом.Может кто-то уточнить, пожалуйста, ниже, если вы уже работали над этим.

Вопрос - 1 : согласно документации Spring-Kafka, существует 2 способа реализации Kafka-Consumer;«Вы можете получать сообщения, настроив MessageListenerContainer и предоставив прослушиватель сообщений или используя аннотацию @KafkaListener».Может кто-нибудь сказать, когда я должен выбрать один вариант из другого?

Вопрос - 2 : Я выбрал подход KafkaListener для написания своего приложения.Для этого мне нужно инициализировать экземпляр фабрики контейнеров, и внутри фабрики контейнеров есть опция для управления параллелизмом.Просто хочу перепроверить, правильное ли мое понимание параллелизма или нет.

Предположим, у меня есть название темы MyTopic, в котором есть 4 раздела.И чтобы получать сообщения от MyTopic, я запустил 2 экземпляра моего приложения, и эти экземпляры запускаются, устанавливая параллелизм равным 2. Итак, в идеале в соответствии со стратегией назначения kafka, 2 раздела должны идти к потребителю1, а 2 других раздела должны идти к потребителю2,Поскольку для параллелизма установлено значение 2, каждый потребитель запускает 2 потока и будет использовать данные из тем параллельно?Также мы должны рассмотреть что-нибудь, если мы потребляем параллельно.

Вопрос 3 - Я выбрал ручной режим подтверждения и не управлял смещениями извне (не сохраняя его в какой-либо базе данных / файловой системе).Так что мне нужно написать собственный код для перебалансировки, или фреймворк будет управлять им автоматически?Я думаю, что нет, поскольку я подтверждаю только после обработки всех записей.

Вопрос - 4 : Кроме того, в режиме ручного подтверждения ACK какой слушатель даст большую производительность?BATCH Message Listener или обычный прослушиватель сообщений.Я предполагаю, что если я использую прослушиватель Normal Message, смещения будут зафиксированы после обработки каждого сообщения.

Вставил код ниже для вашей справки.

Подтверждение партии Потребитель :

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

Инициализация контейнерного завода:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}

Ответы [ 2 ]

1 голос
/ 11 апреля 2019

Q1:

Из документации

Аннотация @KafkaListener используется для определения метода бина в качестве слушателя контейнера слушателя.Компонент обернут в MessagingMessageListenerAdapter, настроенный с различными функциями, такими как конвертеры для преобразования данных, если необходимо, для соответствия параметрам метода.

Большинство атрибутов в аннотации можно настроить с помощью SpEL с помощью "# {…} Или заполнителей свойств ($ {…}). Дополнительную информацию смотрите в Javadoc. "

Этот подход может быть полезен для простых слушателей POJO, и вам не нужно реализовывать какие-либо интерфейсы.Вы также можете прослушивать любые темы и разделы декларативным способом, используя аннотации.Вы также можете потенциально вернуть полученное вами значение, тогда как в случае MessageListener вы связаны подписью интерфейса.

Q2:

В идеале да.Если у вас есть несколько тем для обсуждения, все становится сложнее.Kafka по умолчанию использует RangeAssignor, который имеет свое собственное поведение (вы можете изменить это - см. Подробности в разделе ).

Q3:

Если ваш потребитель умрет, там будетбыть перебалансированным.Если вы подтверждаете это вручную и ваш потребитель умирает до совершения смещения, вам не нужно ничего делать, Kafka справится с этим.Но вы можете получить несколько повторяющихся сообщений (хотя бы один раз)

Q4:

Это зависит от того, что вы подразумеваете под «производительностью».Если вы имели в виду задержку, то путь к каждой записи будет максимально быстрым.Если вы хотите добиться высокой пропускной способности, то пакетное потребление более эффективно.

Я написал несколько примеров с использованием Spring kafka и различных слушателей - посмотрите этот репозиторий

1 голос
/ 11 апреля 2019
  1. @KafkaListener - это управляемый сообщениями "POJO", он добавляет такие вещи, как преобразование полезной нагрузки, сопоставление аргументов и т. Д. Если вы реализуете MessageListener, вы можете получить только необработанный ConsumerRecord от Kafka. См. @ KafkaListener Аннотация .

  2. Да, параллелизм представляет количество потоков; каждый поток создает Consumer; они работают параллельно; в вашем примере каждый получит 2 раздела.

Также мы должны рассмотреть что-либо, если мы потребляем параллельно.

Ваш слушатель должен быть поточно-ориентированным (ни одно общее состояние, ни любое другое состояние не должно быть защищено блокировками.

  1. Непонятно, что вы подразумеваете под «обработкой событий перебалансировки». Когда происходит перебалансировка, каркас будет фиксировать любые ожидающие смещения.

  2. Это не имеет значения; слушатель сообщения Vs. пакетный слушатель это просто предпочтение. Даже с прослушивателем сообщений, с ручным режимом MANUAL, смещения фиксируются, когда все результаты опроса были обработаны. В режиме MANUAL_IMMEDIATE смещения фиксируются один за другим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...