Spring Kafka Retry выбирает сообщения между различными разделами? - PullRequest
0 голосов
/ 26 апреля 2019

Я хотел бы знать, как Spring Kafka обрабатывает повторные попытки, если для одного экземпляра назначено несколько разделов.Spring Kafka продолжает повторять одно и то же сообщение в соответствии с политикой повторных попыток и политикой отката или повторяет попытку, а между попытками отправляет сообщения из других разделов?

Поведение:

A) сообщение о повторной попытке -> сообщение о повторной попытке -> сообщение о повторной попытке

B) сообщение о повторной попытке -> другое сообщение -> сообщение о повторной попытке -> сообщение о повторной попытке

Я рассмотрел другие вопросы, связанные с переполнением стека, которыепохоже, подтверждают, что при заданном одном разделе Spring Kafka не будет перемещаться в другое смещение, но не было никакой информации о том, что такое поведение, если для этого экземпляра было назначено несколько разделов.Я реализовал фабрику с шаблоном повторов и повтором с сохранением состояния.

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        ListenerExceptions listenerExceptions = new ListenerExceptions();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaProperties.CONCURRENCY);
        factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
        return factory;
    }

1 Ответ

1 голос
/ 26 апреля 2019

Конфигурация повтора из упомянутой фабрики делегируется в RetryingMessageListenerAdapter, логика которого выглядит следующим образом:

public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
        final Consumer<?, ?> consumer) {
    RetryState retryState = null;
    if (this.stateful) {
        retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
    }
    getRetryTemplate().execute(context -> {
                context.setAttribute(CONTEXT_RECORD, record);
                switch (RetryingMessageListenerAdapter.this.delegateType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
                        break;
                    case ACKNOWLEDGING:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
                        break;
                    case CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
                        break;
                    case SIMPLE:
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
                }
                return null;
            },
            getRecoveryCallback(), retryState);
}

Итак, мы повторим попытку для каждого сообщения. В соответствии с рекомендациями Apache Kafka мы обрабатываем один раздел в одном потоке, поэтому каждая следующая запись в этом разделе не будет обрабатываться до тех пор, пока не будет исчерпана повторная попытка или вызов не будет успешным.

В зависимости от состояния нескольких разделов и конфигурации factory.setConcurrency(KafkaProperties.CONCURRENCY);, это может быть тот факт, что разные разделы обрабатываются в разных потоках. Поэтому может случиться так, что разные записи из разных разделов будут повторены одновременно. Просто потому, что повторная попытка связана с потоком и стеком вызовов.

...