Как обрабатывать UnkownProducerIdException - PullRequest
0 голосов
/ 07 апреля 2020

У нас возникают некоторые проблемы с Spring Cloud и Kafka, иногда наша микросервисная служба выдает UnkownProducerIdException, это происходит из-за истечения срока действия параметра transactional.id.expiration.ms на стороне брокера.

Мой вопрос, может Можно ли перехватить это исключение и повторить сообщение об ошибке? Если да, что может быть лучшим вариантом для этого?

Я посмотрел на:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
- Kafka UNKNOWN_PRODUCER_ID исключение

Мы используем версию Spring Cloud Hoxton.RELEASE и версию Spring Kafka 2.2.4.RELEASE

Мы используем решение AWS Kafka, поэтому мы не можем установить новое значение для этого свойства Я упоминал ранее.

Вот некоторые следы исключения:

2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563  INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:

Чтобы воспроизвести это исключение:
- я использовал сливное docker images и установите переменную окружения KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS равной 10 секундам, чтобы я не стал слишком долго ждать появления этого исключения.
- В другом процессе отправлять одно за другим с интервалом 10 секунд 1 сообщение в топи c java будет прослушивать.

Вот пример кода:

Файл Привязки. java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Bindings {
  @Input("test-input")
  SubscribableChannel testListener();

  @Output("test-output")
  MessageChannel testProducer();
}

Файл application.yml (не забудьте установить переменную окружения KAFKA_HOST):

spring:
  cloud:
    stream:
      kafka:
        binder:
          auto-create-topics: true
          brokers: ${KAFKA_HOST}
          transaction:
            producer:
              error-channel-enabled: true
          producer-properties:
            acks: all
            retry.backoff.ms: 200
            linger.ms: 100
            max.in.flight.requests.per.connection: 1
            enable.idempotence: true
            retries: 3
            compression.type: snappy
            request.timeout.ms: 5000
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            session.timeout.ms: 20000
            max.poll.interval.ms: 350000
            enable.auto.commit: true
            allow.auto.create.topics: true
            auto.commit.interval.ms: 12000
            max.poll.records: 5
            isolation.level: read_committed
          configuration:
            auto.offset.reset: latest

      bindings:

        test-input:
          # contentType: text/plain
          destination: test.produce
          group: group-input
          consumer:
            maxAttempts: 3
            startOffset: latest
            autoCommitOnError: true
            queueBufferingMaxMessages: 100000
            autoCommitOffset: true


        test-output:
          # contentType: text/plain
          destination: test.produce.another
          group: group-output
          producer:
            acks: all

debug: true

Обработчик слушателя:

* 104 6 *

С уважением

1 Ответ

0 голосов
/ 08 апреля 2020
        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);

Чтобы перехватывать там исключения, вам нужно установить свойство sync kafka Manufacturer (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka -producer-properties ). В противном случае ошибка возвращается асинхронно

Вы не должны «есть» исключение; он должен быть возвращен в контейнер, чтобы контейнер откатил транзакцию.

Кроме того,

        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }

Фиксация выполняется контейнером после того, как слушатель потока возвращается в контейнер, так здесь вы никогда не увидите ошибку фиксации; опять же, вы должны разрешить распространению исключения обратно в контейнер.

Контейнер будет повторять доставку в соответствии с конфигурацией повторных попыток привязки потребителя.

...