Обработка ошибки синтаксического анализа json без сбоя приложения процессора потоков Kafka - PullRequest
0 голосов
/ 20 мая 2019

У меня есть приложение для потоковой передачи kafka, которое отображает / преобразует сообщение json и направляет вывод в тему.

KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record 
                try { // Map record to (requestId, message)
                    // readValue throws IOException, JsonParseException, JsonMappingException
                    LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
                    return new KeyValue<>(logMessage.requestId(), logMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)

теперь я получу ошибку разбора, если входная запись json содержит неверный синтаксис, потоковое приложение вылетает с:

java.lang.NullPointerException
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    ....

Я хочу использовать эту ошибку при отображении и продолжитьобработка для другого сообщения.Есть ли какой-либо обработчик, который я могу установить для потребителя исключения.Ищу предложения.

Спасибо ..

Ответы [ 3 ]

1 голос
/ 20 мая 2019

Вы также можете воспользоваться свойством StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, как описано в https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records.

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndContinueExceptionHandler.class.getName()
);
1 голос
/ 03 июня 2019

Вместо использования map() вы можете использовать flatMap(), что позволяет возвращать ноль элементов.Возвращение null из map() недопустимо, как указано в JavaDocs:

Предоставленный {@link KeyValueMapper} должен возвращать тип {@link KeyValue} и не должен возвращать {@code null}.

Обратите внимание, что flatMap() также не позволяет возвращать null.Но он принимает все, что вы можете повторить (например, Iterable).Например, вы можете вернуть Collections.singleton() в случае успеха и Collection.emptySet() в случае ошибки.

0 голосов
/ 20 мая 2019

Просто взгляните на setUncaughtExceptionHandler метод:

KafkaStreams streams = new KafkaStreams(topology, props);    
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
        // your logic here
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...