У меня есть простое весеннее облачное приложение kafka stream. Приложение завершается каждый раз, когда возникает исключение, и я не могу переписать это поведение. Желаемым результатом является постепенный откат, когда существуют определенные типы исключений или для продолжения других типов исключений. Я использую springCloudVersion - Hoxton.SR3
и spring boot: 2.2.6.RELEASE
application.yaml
spring:
cloud:
stream:
binders.process-in-0:
destination: test
kafka:
streams:
binder:
deserializationExceptionHandler: logAndContinue
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Beans
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return input -> input.process(() -> new EventProcessor());
}
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
ContinueOnErrorHandler.class);
};
}
EventProcessor
public class EventProcessor implements Processor<String, String>, ProcessorSupplier<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
throw new RuntimeException("Some exception");
}
@Override
public void close() {
}
@Override
public Processor<String, String> get() {
return this;
}
}
ContinueOnErrorHandler
public class ContinueOnErrorHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
//ignore
}
}