Описание проблемы:
У меня есть kafka topi c с 12 разделами, с которых запрашивает пользовательское приложение. После обновления брокера Kafka до версии 2.4 с версии 2.2.1 я заметил, что задержка приложения потребителя превращается в большее число и снова обрабатывает записи со смещения нуля.
Topi c конфигурация:
cleanup.policy=compact
delete.retention.ms=100
segment.ms=100
min.cleanable.dirty.ratio=0.01
Потоковые библиотеки приложений:
kafka-streams (version 2.0.1)
spring-cloud-stream-binder-kafka-streams (version 2.1.2.RELEASE)
Spring-kafka (version 2.2.6.RELEASE)
Журналы приложений:
[Consumer clientId=web-streaming-application-8023d9d2-e761-41c5-9b90-d586a3bbe095-StreamThread-1-consumer, groupId=web-consumer-application] Resetting offset for partition web-streaming-application_employees-4 to offset 0.
[Consumer clientId=web-streaming-application-8023d9d2-e761-41c5-9b90-d586a3bbe095-StreamThread-1-consumer, groupId=web-streaming-application] Fetch offset 3378632 is out of range for partition web-streaming-application_employees-4, resetting offset
Журналы и метрики брокеров Kafka в порядке, и проблем с репликацией нет. У меня нет доказательств того, что обновление вызывает проблемы. Поэтому я прошу вашей помощи, чтобы понять, что происходит.
Любые мысли приветствуются.
Конфигурация приложения:
spring:
application:
name: web-streaming-application
kafka:
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
bootstrap.servers: ${kafka.bootstrap-servers}
value.serializer: org.apache.kafka.connect.json.JsonSerializer
cloud.stream:
kafka.streams:
binder:
brokers: ${kafka.bootstrap-servers}
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
serdeError: logAndContinue
bindings:
employeeInput:
consumer:
value-serde: business.serde.EmployeeJsonSerde
application.id: ${spring.application.name}
bindings:
employeeInput:
destination: ${kafka.topics.employee}
group: ${spring.application.name}