Почему Kafka Streams сбрасывает смещения в ноль после обновления Kafka Server 2.4? - PullRequest
0 голосов
/ 20 февраля 2020

Описание проблемы:

У меня есть kafka topi c с 12 разделами, с которых запрашивает пользовательское приложение. После обновления брокера Kafka до версии 2.4 с версии 2.2.1 я заметил, что задержка приложения потребителя превращается в большее число и снова обрабатывает записи со смещения нуля.

Topi c конфигурация:

cleanup.policy=compact
delete.retention.ms=100
segment.ms=100
min.cleanable.dirty.ratio=0.01 

Потоковые библиотеки приложений:

kafka-streams (version 2.0.1)
spring-cloud-stream-binder-kafka-streams (version 2.1.2.RELEASE)
Spring-kafka (version 2.2.6.RELEASE)

Журналы приложений:

[Consumer clientId=web-streaming-application-8023d9d2-e761-41c5-9b90-d586a3bbe095-StreamThread-1-consumer, groupId=web-consumer-application] Resetting offset for partition web-streaming-application_employees-4 to offset 0.
[Consumer clientId=web-streaming-application-8023d9d2-e761-41c5-9b90-d586a3bbe095-StreamThread-1-consumer, groupId=web-streaming-application] Fetch offset 3378632 is out of range for partition web-streaming-application_employees-4, resetting offset

Журналы и метрики брокеров Kafka в порядке, и проблем с репликацией нет. У меня нет доказательств того, что обновление вызывает проблемы. Поэтому я прошу вашей помощи, чтобы понять, что происходит.

Любые мысли приветствуются.

Конфигурация приложения:

spring:
  application:
    name: web-streaming-application
  kafka:
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: GSSAPI
      sasl.kerberos.service.name: kafka
      bootstrap.servers: ${kafka.bootstrap-servers}
      value.serializer: org.apache.kafka.connect.json.JsonSerializer
  cloud.stream:
    kafka.streams:
      binder:
        brokers: ${kafka.bootstrap-servers}
        configuration:
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: GSSAPI
          sasl.kerberos.service.name: kafka
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        serdeError: logAndContinue
      bindings:
        employeeInput:
          consumer:
            value-serde: business.serde.EmployeeJsonSerde
            application.id: ${spring.application.name}
    bindings:
      employeeInput:
        destination: ${kafka.topics.employee}
        group: ${spring.application.name}
...