Невозможно прочитать одно и то же сообщение на одну и ту же тему несколькими пользователями - PullRequest
0 голосов
/ 12 июня 2018

Я публикую сообщение на topicX.Чтобы использовать это сообщение несколькими потребителями, я написал следующее ConsumerConfig (Обратите внимание, что у каждого потребителя разные groupd_id.

@EnableKafka
@Configuration
public class ConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(GROUP_ID_CONFIG, "my.groupid." + groupId);
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Car> consumerFactory(String groupId) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(),
                new JsonDeserializer<>(Car.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Car> consumerOneKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("one"));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Car> consumerTwoKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Car> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("two"));
        return factory;
    }
}

А затем для конкретного потребителя я добавил следующую аннотацию (указанную группуid и фабрика контейнеров)

@KafkaListener(id = "my.groupid.one", topics = "${app.topic.topicname}", containerFactory = "consumerOneKafkaListenerContainerFactory")

Но когда я публикую сообщение, его использует только один потребитель. Как я могу настроить kafka так, чтобы каждое опубликованное сообщение использовалось несколькими потребителями?

Я пробовал этот код с 1 разделом и n разделами (где n = количество потребителей), и все же у меня возникла эта проблема.

ОБНОВЛЕНИЕ

Вот мой класс продюсера темы

@Service
@Slf4j
public class TopicProducer {

    @Autowired
    private KafkaTemplate<String, TopicPayload> kafkaTemplate;

    public void send(String topicName, TopicPayload payload) {
        log.info("sending message={} to topic={}", payload, topicName);
        kafkaTemplate.send(topicName, payload);
    }

}

А вот мой TopicProducerConfig

@Configuration
public class TopicProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, TopicPayload> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, TopicPayload> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Я отправляю сообщение по телефону producer.send(topicName, payload).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...