Лучший способ обработки ошибок в Kafka Consumer - PullRequest
0 голосов
/ 03 августа 2020

У меня есть приложение Springboot, настроенное с помощью spring-kafka, где я хочу обрабатывать всевозможные ошибки, которые могут возникнуть при прослушивании topi c. Если какое-либо сообщение пропущено / не может быть использовано из-за десериализации или любого другого исключения, будет выполнено 2 попытки, после чего сообщение должно быть записано в файл ошибки. У меня есть два подхода, которым можно следовать: -

Первый подход (использование SeekToCurrentErrorHandler с DeadLetterPublishingRecoverer): -

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

Но для этого нам нужно добавить topi c (новый .DLT topi c), а затем мы можем записать его в файл.

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

Подход 2 (с использованием настраиваемого SeekToCurrentErrorHandler): -

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

Может ли кто-нибудь высказать свои предложения относительно стандартного способа реализации такой функции. В первом подходе мы видим накладные расходы на создание тем .DLT и дополнительный @KafkaListener. Во втором подходе мы можем напрямую регистрировать исключение нашей потребительской записи.

Ответы [ 2 ]

1 голос
/ 03 августа 2020

При первом подходе нет необходимости использовать DeadLetterPublishingRecoverer, вы можете использовать любой ConsumerRecordRecoverer, который хотите; на самом деле средство восстановления по умолчанию просто регистрирует сообщение об ошибке.

/**
 * Construct an instance with the default recoverer which simply logs the record after
 * the backOff returns STOP for a topic/partition/offset.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(BackOff backOff) {
    this(null, backOff);
}

И в FailedRecordTracker ...

if (recoverer == null) {
    this.recoverer = (rec, thr) -> {
        
        ...

        logger.error(thr, "Backoff "
            + (failedRecord == null
                ? "none"
                : failedRecord.getBackOffExecution())
            + " exhausted for " + ListenerUtils.recordToString(rec));
    };
}

было добавлено отсрочку (и ограничение на количество повторных попыток) обработчик ошибок после добавления повтора в адаптер прослушивателя, поэтому он «новее» (и предпочтительнее).

Кроме того, использование повтора в памяти может вызвать проблемы с перебалансировкой, если используются длинные BackOff s.

Наконец, только SeekToCurrentErrorHandler может решать проблемы десериализации (через ErrorHandlingDeserializer).

EDIT

Используйте ErrorHandlingDeserializer вместе с а SeekToCurrentErrorHandler. Исключения десериализации считаются фатальными, и средство восстановления вызывается немедленно.

См. документацию .

Вот простое приложение Spring Boot, которое демонстрирует это:

public class So63236346Application {


    private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So63236346Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                + ex.getMessage()));
    }

    @KafkaListener(id = "so63236346", topics = "so63236346")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so63236346", "{\"field\":\"value1\"}");
            template.send("so63236346", "junk");
            template.send("so63236346", "{\"field\":\"value2\"}");
        };
    }

}
package com.example.demo;

public class Thing {

    private String field;

    public Thing() {
    }

    public Thing(String field) {
        this.field = field;
    }

    public String getField() {
        return this.field;
    }

    public void setField(String field) {
        this.field = field;
    }

    @Override
    public String toString() {
        return "Thing [field=" + this.field + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing

Результат

Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
0 голосов
/ 07 августа 2020

Ожидалось, что будет регистрироваться любое исключение, которое мы могли бы получить на уровне контейнера, а также на уровне слушателя.

Без повторных попыток я сделал следующую обработку ошибок: -

Если мы обнаружим какое-либо исключение на уровне контейнера , мы должны иметь возможность регистрировать полезную нагрузку сообщения с описанием ошибки и искать это смещение, пропускать его и go вперед, получая следующее смещение. Хотя это делается только для DeserializationException, для остальных исключений также необходимо выполнять поиск, и для них нужно пропускать смещения.

@Component
public class KafkaContainerErrorHandler implements ErrorHandler {

    private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

        // modify below logic according to your topic nomenclature
        String topics = s.substring(0, s.lastIndexOf('-'));
        int offset = Integer.parseInt(s.split("offset ")[1]);
        int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);

        logger.error("...")
        TopicPartition topicPartition = new TopicPartition(topics, partition);
        logger.info("Skipping {} - {} offset {}",  topics, partition, offset);
        consumer.seek(topicPartition, offset + 1);
    }

    @Override
    public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

    }
}


 factory.setErrorHandler(kafkaContainerErrorHandler);

Если мы получим какое-либо исключение на уровне @KafkaListener, то я настройка моего слушателя с помощью моего собственного обработчика ошибок и регистрация исключения с сообщением, как показано ниже: -

@Bean("customErrorHandler")
    public KafkaListenerErrorHandler listenerErrorHandler() {
        return (m, e) -> {
            logger.error(...);
            return m;
        };
    }
...