Я занимаюсь разработкой приложения kafka-streams, но у меня возникают проблемы с переопределением стандартного ProductionExceptionHandler.Я создал класс, реализующий ProductionExceptionHandler
public class RtaCustomProcessingExceptionHandler implements ProductionExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(RtaCustomProcessingExceptionHandler.class);
private RtaHandlerClient handlerClient;
@Override
public void configure(Map<String, ?> map) {
handlerClient = RtaHandlerClient.getInstance();
}
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.debug("PASSING");
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
, и добавил, что к моим свойствам
properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
RtaCustomProcessingExceptionHandler.class);
, которые я использую для создания экземпляра KafkaStreams
this.streams = new KafkaStreams(BasicTopology.createTopology(config), config.asProperties());
Когда приложение запускается, я вижу в журнале, что мой обработчик поднял
[2018-10-10 07:58:40,471] INFO StreamsConfig values:
application.id = xdr-0
application.server =
bootstrap.servers = [kafka-1:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class com.ericsson.dcp.rtang.kafka.streams.xdr.error.handler.RtaCustomProcessingExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
metadata.max.age.ms = 300000
Но потом в журнале, кажется, он перезаписывается по какой-то странной причине, которую я не могу понять
[2018-10-10 07:58:40,958] INFO StreamsConfig values:
application.id = xdr-0
application.server =
bootstrap.servers = [kafka-1:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = xdr-0-99215001-f1fd-43ae-8c3f-026cbd11d013-StreamThread-1-consumer
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Кто-нибудь знает, почему это происходит?BR-Jiinxy
РЕДАКТИРОВАТЬ : Обновлено с фактической реализацией обработчика по запросу.
EDIT2 : Я настроил свое тестирование, поэтому я добавляю 10 элементов для обработкиКогда шестой элемент содержит значение, которое должно вызывать исключение, обычно мое собственное public class RtaRecoverableProcessingException extends ApiException
, но также RecordTooLargeException
, которое, насколько я понимаю, должно быть перехвачено пользовательским обработчиком.Я также добавил точки останова в RecordCollectorImpl.java в строке 162 (producer.send(..
и 166 (if(exception..
). Я вижу, что для первых пяти элементов процесс проходит правильно как в строке 162, так и в 166. Однако, когдаисключение - брошенная строка 166. Не пропущено. Из того, что я могу сказать, это должна быть пропущенная строка 166, даже если выброшено исключение, поскольку оно обрабатывается в строке 191 (else if (productionExceptionHandler.handle..
).