Кафка - Как пропустить плохое сообщение в смещении и использовать остальное - PullRequest
0 голосов
/ 26 апреля 2018

Я использую Kafka с Avro для сериализации / десериализации для обработки событий. Если случайно в теме попадут плохие данные, не соответствующие схеме avro,

.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.

и сообщение продолжает расти для того же смещения. Есть ли возможность просто пропустить это смещение и продолжить чтение с дальнейших смещений, и если то же самое произойдет снова, пропустить это тоже?

Код потребителя:

@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
    public void listen(Event payLoad) {

        System.out.println("REceived  message ===> " + payLoad);

    }

Фабрика:

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {

        Map<String, Object> propMap = new HashMap<String, Object>();
        propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");

        DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
                propMap);

        consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
                Event.class));
        ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consuemrFactory);
        listenerFactory.setConcurrency(3);
        listenerFactory.setRetryTemplate(retryTemplate());
        listenerFactory.getContainerProperties().setPollTimeout(3000);
        return listenerFactory;
    }

1 Ответ

0 голосов
/ 26 апреля 2018

Попробуйте изменить политику в соответствии с предложением @Poppy

SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry SerializationException
policy.setFatalExceptions(new Class[] {SerializationException.class}); //<-- here

// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
    public Foo doWithRetry(RetryContext context) {
        // business logic here
    }
});

Отсюда: https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...