Я выполняю локально Продюсер и Потребитель, используя Spring Kafka. На Producer события публикуются успешно в топи c, и потребитель назначил топи c не получать контент. Функция @KafkaListener вызывается, но полученный список пуст. Просматривая журналы, можно заметить, что, когда производитель публикует sh какое-то событие, в то же время Потребитель получает события, но с пустым списком. Брокер и структура в порядке, потому что другие темы работают.
Потребитель:
@KafkaListener(topics = "${topic}")
public void listener(final List<ConsumerRecord<String, GenericRecord>> events) {
LOGGER.info("Received {} events", events.size());
}
public Map<String, Object> listenerConfiguration() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServerUrl);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecordsPerPoll.toString());
return props;
}
public ConsumerFactory<String, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(listenerConfiguration());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(
final TransactionConsumerRetry transactionConsumerRetry) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(Boolean.TRUE);
factory.setAckDiscarded(true);
return factory;
}
вывод:
2020-07-09 14:03:29,879 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events
2020-07-09 14:03:35,812 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events
2020-07-09 14:03:37,178 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] class:Listener, Received 0 events
Производитель:
public void sendEvent(final String topic, final String key, final GenericRecord value)
throws ExecutionException, InterruptedException {
try {
kafkaTemplate.send(topic, key, value).get();
} catch (Exception e) {
String exceptionName = ExceptionUtils.getRootCause(e).getClass().getName();
LOGGER.error("M=sendEvent, Msg=Error while sending event with ID: {}, Exception: {}", key, exceptionName);
throw e;
}
}
public ProducerFactory<String, GenericRecord> producerFactory() {
return new DefaultKafkaProducerFactory<>(getProducerGenericRecordConfigurations());
}
private Map<String, Object> getProducerGenericRecordConfigurations() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServerUrl);
configProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return configProps;
}
Я проверил серверы, темы и конфигурации, все в порядке.