Приложение Spring Cloud Kafka Stream завершается при исключении - PullRequest
0 голосов
/ 21 апреля 2020

У меня есть простое весеннее облачное приложение kafka stream. Приложение завершается каждый раз, когда возникает исключение, и я не могу переписать это поведение. Желаемым результатом является постепенный откат, когда существуют определенные типы исключений или для продолжения других типов исключений. Я использую springCloudVersion - Hoxton.SR3 и spring boot: 2.2.6.RELEASE

application.yaml

spring:
 cloud:
   stream:
     binders.process-in-0:
         destination: test

     kafka:
       streams:
         binder:
           deserializationExceptionHandler: logAndContinue
           configuration:
             default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
             default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

Beans

@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
    return input -> input.process(() -> new EventProcessor());
}

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                ContinueOnErrorHandler.class);
    };
}

EventProcessor

public class EventProcessor implements Processor<String, String>, ProcessorSupplier<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {

        throw new RuntimeException("Some exception");
    }

    @Override
    public void close() {

    }

    @Override
    public Processor<String, String> get() {
        return this;
    }
}

ContinueOnErrorHandler

public class ContinueOnErrorHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        return ProductionExceptionHandlerResponse.CONTINUE;
}

    @Override
    public void configure(Map<String, ?> configs) {
        //ignore
    }
}

1 Ответ

0 голосов
/ 22 апреля 2020

Пользовательский процессор, который вы используете от потребителя, выбрасывает RuntimeException в методе process. Ничего не поймано. Когда выдается это исключение, приложение просто завершается.

Используемый вами обработчик производственного исключения не имеет никакого эффекта, поскольку вы ничего здесь не производите. Consumer ничего не производит. Если у вас есть сценарий производства чего-либо, вы должны вместо этого переключиться на java.util.funciton.Function.

Чтобы устранить эту проблему здесь, когда вы обрабатываете запись в пользовательском процессоре (EventProcessor), если вы получаете исключение, вы должны перехватить его и предпринять соответствующие действия. Например, вот шаблон:

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, String value) {
            try {
                // start processing
                // exception thrown

            }
            catch (Exception e){
                // Take the appropriate action
            }
        }

Таким образом, приложение не будет завершено, когда в процессоре будет сгенерировано исключение.

...