У меня есть следующий слушатель - производитель в Spring Cloud Stream:
@StreamListener(target = MultipleProcessor.DOTCONN_INPUT, condition= "headers['kafka_receivedTopic']=='dotconnectorissues'")
public void inputDot(Message<DotConnectorIssue> messageIn) {
DotConnectorIssue data = messageIn.getPayload();
ObjectMapper mapper = new ObjectMapper();
DotConnectorUpdateDto dataMapped = new DotConnectorUpdateDto(data);
if (dataMapped.getPlantCode().equals(plantCode)) {
log.info("incoming dotConnectorIssue " + data);
try {
Message<String> messageOut = MessageBuilder
.withPayload(mapper.writeValueAsString(dataMapped))
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("type", "dotconnectorissue")
.build();
boolean send = ehProcessor.outputAndon().send(messageOut, 15000L);
log.info("sent message: "+ send);
if (!send) messagePool.getPool().add(messageOut);
} catch (JsonProcessingException e) {
log.error("error during creating json", e);
}
}
}
фрагмент кода работает, но иногда сообщение не может быть отправлено из-за следующей ошибки:
[kafka-producer-network-thread | producer-2] ERROR o.s.k.s.LoggingProducerListener.onError -
Exception thrown when sending a message with key='null' and payload='{123, 34, 116, 121, 112, 101, 34, 58, 34, 85, 80, 68, 65, 84, 69, 95, 68, 79, 84, 67, 79, 78, 78, 69...' to topic andon:
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
также, если переменная отправки имеет значение true.
Как я могу обработать ошибку NetworkException в Spring Cloud Stream?