Я публикую сообщение на topicX
.Чтобы использовать это сообщение несколькими потребителями, я написал следующее ConsumerConfig
(Обратите внимание, что у каждого потребителя разные groupd_id
.
@EnableKafka
@Configuration
public class ConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(GROUP_ID_CONFIG, "my.groupid." + groupId);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Car> consumerFactory(String groupId) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Car> consumerOneKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("one"));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Car> consumerTwoKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("two"));
return factory;
}
}
А затем для конкретного потребителя я добавил следующую аннотацию (указанную группуid и фабрика контейнеров)
@KafkaListener(id = "my.groupid.one", topics = "${app.topic.topicname}", containerFactory = "consumerOneKafkaListenerContainerFactory")
Но когда я публикую сообщение, его использует только один потребитель. Как я могу настроить kafka так, чтобы каждое опубликованное сообщение использовалось несколькими потребителями?
Я пробовал этот код с 1 разделом и n разделами (где n = количество потребителей), и все же у меня возникла эта проблема.
ОБНОВЛЕНИЕ
Вот мой класс продюсера темы
@Service
@Slf4j
public class TopicProducer {
@Autowired
private KafkaTemplate<String, TopicPayload> kafkaTemplate;
public void send(String topicName, TopicPayload payload) {
log.info("sending message={} to topic={}", payload, topicName);
kafkaTemplate.send(topicName, payload);
}
}
А вот мой TopicProducerConfig
@Configuration
public class TopicProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, TopicPayload> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, TopicPayload> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Я отправляю сообщение по телефону producer.send(topicName, payload)
.