Приоритет Кафка тема - PullRequest
       31

Приоритет Кафка тема

0 голосов
/ 27 апреля 2020

Мне нужно полностью прочитать сообщение из темы1, а затем прочитать сообщение из темы2. Я буду получать сообщения в этих топиках c каждый день. Мне удалось прекратить чтение сообщений из topic2, прежде чем читать все сообщения в topic1, но это происходит для меня только один раз при запуске сервера. Может кто-нибудь помочь мне с этим сценарием.

Код ListenerConfig

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic1Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic2Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

}

Код списка

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets)  {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received first='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
    public void receiveRel(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received second='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @EventListener()
    public void eventHandler(ListenerContainerIdleEvent event) {
        LOG.info("Inside event");
        this.registry.getListenerContainer("second-listener").start();
    }

Пожалуйста, помогите мне в решении, так как этот цикл должен происходить каждый день. Полное чтение сообщения topic1, а затем чтение сообщения topic2.

Ответы [ 2 ]

0 голосов
/ 27 апреля 2020

Вы уже используете прослушиватель события ожидания для запуска второго прослушивателя - он также должен остановить первого прослушивателя.

Когда второй прослушиватель переходит в режим ожидания; остановите его.

Вы должны проверить, для какого контейнера предназначено событие, чтобы решить, какой контейнер остановить и / или запустить.

Затем, используя TaskScheduler, запланируйте start() из первый слушатель в следующий раз, когда вы захотите его запустить.

0 голосов
/ 27 апреля 2020

Topi c в Кафке - это абстракция, где публикуется поток записей. Потоки, естественно, не ограничены, поэтому у них есть начало, но у них нет определенного конца. Для вашего случая, прежде всего, вам необходимо четко определить, что является концом вашего topic1 и вашего topic2, чтобы вы могли останавливать / предполагать своих потребителей при необходимости. Возможно, вы знаете, сколько сообщений вы будете обрабатывать для каждой топи c, поэтому вы можете использовать: position или commmited , чтобы остановить одного потребителя и предположить другого в этот момент. Или, если вы используете потоковую среду, у них обычно есть окно сеанса, в котором среда обнаруживает элементы групп по сеансам активности. Вы также можете предпочесть поместить этот лог c на стороне приложения, чтобы вам не нужно было останавливать / запускать какие-либо потребительские потоки.

...