Кафка Тема заказа с весны Кафка - PullRequest
0 голосов
/ 23 января 2019

Я пытаюсь настроить систему, которая будет читать две разные темы Кафки - одну для живых сообщений и одну для массовых сообщений. Надежда состоит в том, что независимо от того, сколько сообщений в «массовой» теме, все в «живой» теме отдается предпочтение.

кажется , что Spring Kafka делает это из коробки - иногда.

То, что у меня есть, просто:

@KafkaListener(topics = {"sync-live", "sync-bulk"}, concurrency = "1")

И моя конфигурация:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

И иногда это будет делать именно то, что я хочу, а иногда - нет. Фактически, пару раз я видел, как он переключается с «живой» темы на «массовую», когда еще есть живые сообщения для обработки!

Есть ли способ заставить Spring Kafka читать только из второй темы, когда первая пуста?

Приветствия

1 Ответ

0 голосов
/ 23 января 2019

Это не имеет никакого отношения к Spring, распределение сообщений по темам, на которые подписан потребитель, является функцией брокера.

Проблема в том, что вы используете один потребитель (контейнер слушателя) для обоихthemes.

Чтобы получить выделенного потребителя для каждого, используйте несколько аннотаций ...

@KafkaListener(groupId = "live.group", topics = "sync-live", concurrency = "1")
@KafkaListener(groupId = "bulk.group", topics = "sync-bulk", concurrency = "1")
public synchronized void listen(...) { ... }

synchronized предотвратит одновременный вызов слушателем обоих контейнеров.Пропустите это, если это не проблема.

...