Я продолжаю писать свой первый потребитель Kafka, используя Spring-Kafka.Посмотрел различные варианты, предоставляемые фреймворком, и мало сомневался в этом.Может кто-то уточнить, пожалуйста, ниже, если вы уже работали над этим.
Вопрос - 1 : согласно документации Spring-Kafka, существует 2 способа реализации Kafka-Consumer;«Вы можете получать сообщения, настроив MessageListenerContainer и предоставив прослушиватель сообщений или используя аннотацию @KafkaListener».Может кто-нибудь сказать, когда я должен выбрать один вариант из другого?
Вопрос - 2 : Я выбрал подход KafkaListener для написания своего приложения.Для этого мне нужно инициализировать экземпляр фабрики контейнеров, и внутри фабрики контейнеров есть опция для управления параллелизмом.Просто хочу перепроверить, правильное ли мое понимание параллелизма или нет.
Предположим, у меня есть название темы MyTopic, в котором есть 4 раздела.И чтобы получать сообщения от MyTopic, я запустил 2 экземпляра моего приложения, и эти экземпляры запускаются, устанавливая параллелизм равным 2. Итак, в идеале в соответствии со стратегией назначения kafka, 2 раздела должны идти к потребителю1, а 2 других раздела должны идти к потребителю2,Поскольку для параллелизма установлено значение 2, каждый потребитель запускает 2 потока и будет использовать данные из тем параллельно?Также мы должны рассмотреть что-нибудь, если мы потребляем параллельно.
Вопрос 3 - Я выбрал ручной режим подтверждения и не управлял смещениями извне (не сохраняя его в какой-либо базе данных / файловой системе).Так что мне нужно написать собственный код для перебалансировки, или фреймворк будет управлять им автоматически?Я думаю, что нет, поскольку я подтверждаю только после обработки всех записей.
Вопрос - 4 : Кроме того, в режиме ручного подтверждения ACK какой слушатель даст большую производительность?BATCH Message Listener или обычный прослушиватель сообщений.Я предполагаю, что если я использую прослушиватель Normal Message, смещения будут зафиксированы после обработки каждого сообщения.
Вставил код ниже для вашей справки.
Подтверждение партии Потребитель :
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record : " + record.value());
// Process the message here..
listener.addOffset(record.topic(), record.partition(), record.offset());
}
acknowledgment.acknowledge();
}
Инициализация контейнерного завода:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
// Not sure about the impact of this property, so going with 1
factory.setConcurrency(2);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
return factory;
}