У меня есть приложение для потоковой передачи kafka, которое отображает / преобразует сообщение json и направляет вывод в тему.
KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record
try { // Map record to (requestId, message)
// readValue throws IOException, JsonParseException, JsonMappingException
LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
return new KeyValue<>(logMessage.requestId(), logMessage);
} catch (IOException e) {
e.printStackTrace();
}
return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)
теперь я получу ошибку разбора, если входная запись json содержит неверный синтаксис, потоковое приложение вылетает с:
java.lang.NullPointerException
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
....
Я хочу использовать эту ошибку при отображении и продолжитьобработка для другого сообщения.Есть ли какой-либо обработчик, который я могу установить для потребителя исключения.Ищу предложения.
Спасибо ..