Поток Kafka не повторяется при ошибке десериализации - PullRequest
0 голосов
/ 26 июня 2019

Spring cloud Поток Kafka не повторяется после ошибки десериализации даже после определенной конфигурации. Предполагается, что он должен повторить попытку на основе настроенной политики повторных попыток и в конце отправить сообщение о сбое в DLQ.

Конфигурация как показано ниже.

spring.cloud.stream.bindings.input_topic.consumer.maxAttempts=7
spring.cloud.stream.bindings.input_topic.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input_topic.consumer.backOffMultiplier=10.0
spring.cloud.stream.bindings.input_topic.consumer.backOffMaxInterval=100000
spring.cloud.stream.bindings.iinput_topic.consumer.defaultRetryable=true
public interface MyStreams {

    String INPUT_TOPIC = "input_topic";
    String INPUT_TOPIC2 = "input_topic2";
    String ERROR = "apperror";
    String OUTPUT = "output";

    @Input(INPUT_TOPIC)
    KStream<String, InObject> inboundTopic();

    @Input(INPUT_TOPIC2)
    KStream<Object, InObject> inboundTOPIC2();

    @Output(OUTPUT)
    KStream<Object, outObject> outbound();

    @Output(ERROR)
    MessageChannel outboundError();
}

@StreamListener(MyStreams.INPUT_TOPIC)
    @SendTo(MyStreams.OUTPUT)
    public KStream<Key, outObject> processSwft(KStream<Key, InObject> myStream) {
        return myStream.mapValues(this::transform);
    }

metadataRetryOperations в KafkaTopicProvisioner.java всегда имеет значение null и, следовательно, создает новый RetryTemplate в afterPropertiesSet().

public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
        Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0D);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }

    }

Ответы [ 2 ]

0 голосов
/ 26 июня 2019

Конфигурация повтора работает только с связывателями на основе MessageChannel. Благодаря связующему KStream Spring просто помогает строить топологию предписанным способом, он не участвует в потоке сообщений после построения топологии.

Следующая версия spring-kafka (используемая компоновщиком) добавила RecoveringDeserializationExceptionHandler ( коммит здесь ); хотя это не может помочь при повторных попытках, его можно использовать с DeadLetterPublishingRecoverer отправкой записи в тему недоставленных сообщений.

Вы можете использовать RetryTemplate в своих процессорах / преобразователях, чтобы повторить определенные операции.

0 голосов
/ 26 июня 2019

Spring Cloud Поток Kafka не повторяется после ошибки десериализации даже после определенной конфигурации.

Видимое поведение соответствует настройкам по умолчанию потоков Kafka, когда он обнаруживает ошибку десериализации.

С https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records:

LogAndFailExceptionHandler реализует DeserializationExceptionHandler и является настройкой по умолчанию в Kafka Streams.Он обрабатывает любые возникшие исключения десериализации, регистрируя ошибку и выдавая фатальную ошибку, чтобы остановить приложение Streams.Если ваше приложение настроено на использование LogAndFailExceptionHandler, то экземпляр вашего приложения будет работать быстро, когда он обнаружит поврежденную запись, завершив ее сам.

Я не знаком с фасадом Spring для Kafka Streams, но вам, вероятно, нужно настроить желаемый org.apache.kafka.streams.errors.DeserializationExceptionHandler вместо настройки повторных попыток (они предназначены для другой цели).Или вы можете захотеть реализовать свой собственный, настраиваемый обработчик (см. Ссылку выше для получения дополнительной информации), а затем настроить Spring / KStreams для его использования.

...