У меня есть требование отправить запрос и ответ моего микросервиса брокеру Kafka.
Я могу сделать это успешно с локальной установкой Kafka. Но когда я перевожу его в более высокую среду, онзавершается с ошибкой, приведенной ниже.
o.a.k.clients.producer.KafkaProducer: [Producer clientId=producer-9]
Closing the Kafka producer with
timeoutMillis = 0 ms.",
"source":"stdout",
"tag":"PromiseEngine-Staging-aafa49a1ea22"}
Я не установил тайм-аут на ноль, и я мог видеть, что в настройках производителя таймаут составляет 30000 мс.
Также я реализовал обратные вызовы об успешном завершении и сбоеи никто из них не будет казнен.
@Autowired
KafkaTemplate<String, KafkaMessageDTO> kafkaTemplate;
private void publishToKafkaTopic(String topic,
SchedulingCallRequestDTO request, ScheduleOrderResponseDTOv2 response) {
log.debug("entered publishToKafkaTopic: " + CommonUtils.getJson(response));
ListenableFuture<SendResult<String, KafkaMessageDTO>> future =
send(topic,new KafkaMessageDTO(request, response));
future.addCallback(
new ListenableFutureCallback<SendResult<String, KafkaMessageDTO>>() {
@Override
public void onSuccess(SendResult<String, KafkaMessageDTO> result) {
log.info("Eequest and Response pushed to Kafka: " +
CommonUtils.getJson(result.getProducerRecord().value()));
}
@Override
public void onFailure(Throwable ex) {
log.error("Error occured while pushing to Kafka." + " {}",
CommonUtils.getJson(new KafkaMessageDTO(request, response)));
}
});
}
private ListenableFuture<SendResult<String, KafkaMessageDTO>> send(String topic, KafkaMessageDTO message) {
log.debug("entered send: " + CommonUtils.getJson(message));
ListenableFuture<SendResult<String, KafkaMessageDTO>> future = kafkaTemplate
.send(generateProducerRecord(topic, message));
return future;
}
Может ли кто-нибудь помочь мне с этим