Функция @KafkaListener вызвана, но получает пустой список - PullRequest
0 голосов
/ 09 июля 2020

Я выполняю локально Продюсер и Потребитель, используя Spring Kafka. На Producer события публикуются успешно в топи c, и потребитель назначил топи c не получать контент. Функция @KafkaListener вызывается, но полученный список пуст. Просматривая журналы, можно заметить, что, когда производитель публикует sh какое-то событие, в то же время Потребитель получает события, но с пустым списком. Брокер и структура в порядке, потому что другие темы работают.

Потребитель:

@KafkaListener(topics = "${topic}")
public void listener(final List<ConsumerRecord<String, GenericRecord>> events) {
    LOGGER.info("Received {} events",  events.size());
}

public Map<String, Object> listenerConfiguration() {
    HashMap<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServerUrl);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecordsPerPoll.toString());
    return props;
}

public ConsumerFactory<String, GenericRecord> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(listenerConfiguration());
}

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(
            final TransactionConsumerRetry transactionConsumerRetry) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(Boolean.TRUE);
        factory.setAckDiscarded(true);
         return factory;
  }

вывод:

 2020-07-09 14:03:29,879 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events
 2020-07-09 14:03:35,812 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events
 2020-07-09 14:03:37,178 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events

Производитель:

   public void sendEvent(final String topic, final String key, final GenericRecord value)
            throws ExecutionException, InterruptedException {
        try {
            kafkaTemplate.send(topic, key, value).get();
        } catch (Exception e) {
            String exceptionName = ExceptionUtils.getRootCause(e).getClass().getName();
            LOGGER.error("M=sendEvent, Msg=Error while sending event with ID: {}, Exception: {}", key, exceptionName);
            throw e;
        }
    }

   public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(getProducerGenericRecordConfigurations());
    }

    private Map<String, Object> getProducerGenericRecordConfigurations() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServerUrl);
        configProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return configProps;
    }

Я проверил серверы, темы и конфигурации, все в порядке.

1 Ответ

0 голосов
/ 09 июля 2020

Я не уверен, почему вы получаете там пустой список (я этого не сделал, когда только что протестировал его, я получил список значений записи - введите стирание), но для пакетного прослушивателя (List<ConsumerRecord<?, ?>>) вы должны setBatchListener(true) на заводе контейнеров.

...