Обработка исключений потоков Кафки - PullRequest
0 голосов
/ 10 октября 2018

Я занимаюсь разработкой приложения kafka-streams, но у меня возникают проблемы с переопределением стандартного ProductionExceptionHandler.Я создал класс, реализующий ProductionExceptionHandler

public class RtaCustomProcessingExceptionHandler implements ProductionExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(RtaCustomProcessingExceptionHandler.class);
    private RtaHandlerClient handlerClient;

    @Override
    public void configure(Map<String, ?> map) {
        handlerClient = RtaHandlerClient.getInstance();
    }

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.debug("PASSING");

        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

, и добавил, что к моим свойствам

        properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RtaCustomProcessingExceptionHandler.class);

, которые я использую для создания экземпляра KafkaStreams

this.streams = new KafkaStreams(BasicTopology.createTopology(config), config.asProperties());

Когда приложение запускается, я вижу в журнале, что мой обработчик поднял

[2018-10-10 07:58:40,471] INFO StreamsConfig values: 
    application.id = xdr-0
    application.server = 
    bootstrap.servers = [kafka-1:9092]
    buffered.records.per.partition = 1000
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 1000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class com.ericsson.dcp.rtang.kafka.streams.xdr.error.handler.RtaCustomProcessingExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.WallclockTimestampExtractor
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    metadata.max.age.ms = 300000

Но потом в журнале, кажется, он перезаписывается по какой-то странной причине, которую я не могу понять

 [2018-10-10 07:58:40,958] INFO StreamsConfig values: 
    application.id = xdr-0
    application.server = 
    bootstrap.servers = [kafka-1:9092]
    buffered.records.per.partition = 1000
    cache.max.bytes.buffering = 10485760
    client.id = xdr-0-99215001-f1fd-43ae-8c3f-026cbd11d013-StreamThread-1-consumer
    commit.interval.ms = 30000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

Кто-нибудь знает, почему это происходит?BR-Jiinxy

РЕДАКТИРОВАТЬ : Обновлено с фактической реализацией обработчика по запросу.

EDIT2 : Я настроил свое тестирование, поэтому я добавляю 10 элементов для обработкиКогда шестой элемент содержит значение, которое должно вызывать исключение, обычно мое собственное public class RtaRecoverableProcessingException extends ApiException, но также RecordTooLargeException, которое, насколько я понимаю, должно быть перехвачено пользовательским обработчиком.Я также добавил точки останова в RecordCollectorImpl.java в строке 162 (producer.send(.. и 166 (if(exception..). Я вижу, что для первых пяти элементов процесс проходит правильно как в строке 162, так и в 166. Однако, когдаисключение - брошенная строка 166. Не пропущено. Из того, что я могу сказать, это должна быть пропущенная строка 166, даже если выброшено исключение, поскольку оно обрабатывается в строке 191 (else if (productionExceptionHandler.handle..).

1 Ответ

0 голосов
/ 10 октября 2018

ваш обработчик исключений потоков kafka RtaCustomProcessingExceptionHandler правильный и должен работать.Вы можете проверить это, поместив точку останова в метод handle вашего ProductionExceptionHandler, и эмулировать исключительный случай (например, выбросить любое исключение во время обработки сообщения).

даже из вашей первой части журналов вы видите, что применяется default.deserialization.exception.handler.Вторая часть предоставленных журналов предназначена для внутренних нужд потоков kafka и принимает значения конфигурации по умолчанию (если вы сравните другие свойства, такие как commit.interval.ms, вы также увидите, что все свойства являются параметрами по умолчанию).

...