Как пропустить исключение сериализации Avro в KafkaStreams API? - PullRequest
0 голосов
/ 25 января 2020

У меня есть приложение Kafka, написанное KafkaStreams Java api. Он читает данные из Mysql binlog и делает некоторые вещи, которые не имеют отношения к моему вопросу. Проблема в том, что одна конкретная строка выдает ошибку в десериализации из avro. Я могу покопаться в файле схемы Avro и найти проблему, но в целом мне нужен простительный обработчик исключений, который при возникновении такой ошибки не останавливает все приложение. Это основная часть моего потокового приложения:

StreamsBuilder streamsBuilder = watchForCourierUpdate(builder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }

    private static StreamsBuilder watchForCourierUpdate(StreamsBuilder builder){
        CourierUpdateListener courierUpdateListener = new CourierUpdateListener(builder);
        courierUpdateListener.start();
        return builder;
    }

    private static Properties configProperties(){

        Properties streamProperties = new Properties();

        streamProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Configs.getConfig("schemaRegistryUrl"));
        streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "courier_app");
        streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Configs.getConfig("bootstrapServerUrl"));
        streamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        streamProperties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state_dir");
        streamProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
        streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
        streamProperties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                CourierSerializationException.class);

        return streamProperties;

    }

Это мой класс CourierSerializationException:

public class CourierSerializationException implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> producerRecord, Exception e) {
        Logger.logError("Failed to de/serialize entity from " + producerRecord.topic() + " topic.\n" + e);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

Тем не менее, всякий раз, когда возникает исключение авросериализации, поток закрывается, а приложение выполняет не продолжать Я что-то упустил!

1 Ответ

3 голосов
/ 26 января 2020

Вы пытались сделать это с помощью default.deserialization.exception.handler, предоставленного kafka? Вы можете использовать LogAndContinueExceptionHandler, который будет регистрировать и продолжать.

Я могу ошибаться, но я думаю, что создание Customexception путем реализации ProductionExceptionHandler работает только для ошибок, связанных с сетью на стороне kafka.

добавьте это в свойства и посмотрите, что произойдет:

> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
...