У нас возникают некоторые проблемы с Spring Cloud и Kafka, иногда наша микросервисная служба выдает UnkownProducerIdException
, это происходит из-за истечения срока действия параметра transactional.id.expiration.ms
на стороне брокера.
Мой вопрос, может Можно ли перехватить это исключение и повторить сообщение об ошибке? Если да, что может быть лучшим вариантом для этого?
Я посмотрел на:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
- Kafka UNKNOWN_PRODUCER_ID исключение
Мы используем версию Spring Cloud Hoxton.RELEASE
и версию Spring Kafka 2.2.4.RELEASE
Мы используем решение AWS Kafka, поэтому мы не можем установить новое значение для этого свойства Я упоминал ранее.
Вот некоторые следы исключения:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563 INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:
Чтобы воспроизвести это исключение:
- я использовал сливное docker images и установите переменную окружения KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS
равной 10 секундам, чтобы я не стал слишком долго ждать появления этого исключения.
- В другом процессе отправлять одно за другим с интервалом 10 секунд 1 сообщение в топи c java будет прослушивать.
Вот пример кода:
Файл Привязки. java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProducer();
}
Файл application.yml (не забудьте установить переменную окружения KAFKA_HOST
):
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST}
transaction:
producer:
error-channel-enabled: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: group-input
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: group-output
producer:
acks: all
debug: true
Обработчик слушателя:
* 104 6 *
С уважением