Я пытаюсь использовать ReplyingKafkaTemplate и периодически вижу сообщение ниже.
Нет ожидающего ответа: ConsumerRecord (topic = request-reply-topic, partition = 8, offset = 1,CreateTime = 1544653843269, размер сериализованного ключа = -1, размер сериализованного значения = 1609, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = kafka_correlationId, значение = [-14, 65, 21, -118, 70, -94, 72,87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), ключ = ноль, с id_корреляции: [-18271255759235816475365319231847350110], возможно, тайм-аут или использованиеобщая тема ответа
Это будет происходить из приведенного ниже кода
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
Но это происходит только случайно
Я также установил для параметра replyTopic значение false, как показано нижеи попытался увеличить время ожидания
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;
Мой Контейнер такой, как показано ниже
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}