Я использую Spring Boot 2.2 вместе с кластером Kafka (диаграмма bitnami helm). И получить довольно странное поведение.
Наличие весеннего загрузочного приложения с несколькими пользователями по нескольким темам.
Вызов kafka-consumer-groups.sh --bootstrap-server localhost: 9092 --describe-group my-app дает:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app event.topic-a 0 2365079 2365090 11 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app event.topic-a 1 2365080 2365091 11 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app batch.topic-a 0 278363 278363 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-a 1 278362 278362 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-b 0 1434 1434 0 consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47 consumer-5
my-app event.topic-b 0 2530 2530 0 consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47 consumer-6
my-app batch.topic-c 0 1779 1779 0 consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47 consumer-1
my-app event.topic-c 0 12308 13502 1194 - - -
Повторный вызов дает
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app event.topic-a 0 2365230 2365245 15 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app event.topic-a 1 2365231 2365246 15 consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47 consumer-4
my-app batch.topic-a 0 278363 278363 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-a 1 278362 278362 0 consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47 consumer-3
my-app batch.topic-b 0 1434 1434 0 consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47 consumer-5
my-app event.topic-b 0 2530 2530 0 consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47 consumer-6
my-app batch.topic-c 0 1779 1779 0 consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47 consumer-1
my-app event.topic-c 0 12308 13505 1197 consumer-2-d52e2b96-f08c-4247-b827-4464a305cb20 /10.244.3.47 consumer-2
Как вы могли видеть, потребитель для event.topic-c сейчастам но отстает 1197 записей. Само приложение читает из темы, но всегда одни и те же события (похоже на величину лага), но смещение не меняется. Я не получаю ошибок или записей в журнале, ни на кафке, ни на весенней загрузке. Все, что у меня есть, для этой конкретной темы одни и те же события обрабатываются снова и снова ..... все другие темы в приложении работают правильно.
Вот клиентская конфигурация:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = sap-integration
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Любая идея ... Я немного потерян ..
Редактировать: Конфигурация Spring довольно стандартна:
configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = MyJsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "*"
Вот несколько примеров из журналов:
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603361, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584589, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603362, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584635, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
....
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
в то время как потребитель осуществляет laging
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-app topic-c 0 37603363 37720873 117510 consumer-3-2b8499c0-7304-4906-97f8-9c0f6088c469 /10.244.3.64 consumer-3
Нет ошибок, нет предупреждений ... просто больше сообщений нет ...
Thx