Spring Boot Kafka потребитель отстает и читает неправильно - PullRequest
0 голосов
/ 31 октября 2019

Я использую Spring Boot 2.2 вместе с кластером Kafka (диаграмма bitnami helm). И получить довольно странное поведение.

Наличие весеннего загрузочного приложения с несколькими пользователями по нескольким темам.

Вызов kafka-consumer-groups.sh --bootstrap-server localhost: 9092 --describe-group my-app дает:

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365079         2365090         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365080         2365091         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13502           1194            -                                               -               - 

Повторный вызов дает

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365230         2365245         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365231         2365246         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13505           1197            consumer-2-d52e2b96-f08c-4247-b827-4464a305cb20 /10.244.3.47    consumer-2

Как вы могли видеть, потребитель для event.topic-c сейчастам но отстает 1197 записей. Само приложение читает из темы, но всегда одни и те же события (похоже на величину лага), но смещение не меняется. Я не получаю ошибок или записей в журнале, ни на кафке, ни на весенней загрузке. Все, что у меня есть, для этой конкретной темы одни и те же события обрабатываются снова и снова ..... все другие темы в приложении работают правильно.

Вот клиентская конфигурация:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = sap-integration
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

Любая идея ... Я немного потерян ..

Редактировать: Конфигурация Spring довольно стандартна:

configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = MyJsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "*"

Вот несколько примеров из журналов:

2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603361, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584589, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603362, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584635, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
....
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}

в то время как потребитель осуществляет laging

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app          topic-c              0          37603363        37720873       117510          consumer-3-2b8499c0-7304-4906-97f8-9c0f6088c469 /10.244.3.64    consumer-3

Нет ошибок, нет предупреждений ... просто больше сообщений нет ...

Thx

1 Ответ

1 голос
/ 01 ноября 2019

Вам нужно искать логи, как это ...

2019-11-01 16: 33: 31.825 INFO 35182 --- [kgh1231-0-C-1] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-2, groupId = kgh1231] (Повторно) присоединяющаяся группа

...

2019-11-01 16: 33: 31.872 INFO 35182 --- [kgh1231-0-C-1] osklKafkaMessageListenerContainer: назначенные разделы: [kgh1231-0, kgh1231-2, kgh1231-1, kgh1231-4, kgh1231-3]

...

2019-11-01 16: 33: 31.897 DEBUG 35182 --- [kgh1231-0-C-1] essageListenerContainer $ ListenerConsumer: получено: 10 записей

...

2019-11-01 16: 33: 31.902 DEBUG 35182 --- [kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter: Обработка [GenericMessage [payload = foo1, заголовки = {kafka_offset = 80, kafka_consumer = org.apache.kafka.clients. consumer.KafkaConsumer@3d00c543, kafka_timestampType = CREATE_TIME, kafka_receivedMessageKey = null, kafka_receivedPartitionId = 0, kafka_receivedTopic = kgh1231, kafka_receivedTimestamp9 157 7 159 157]

...

2019-11-01 16: 33: 31.906 ОТЛАДКА 35182 --- [kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter: Обработка [GenericMessage [полезная нагрузка]= foo5, заголовки = {kafka_offset = 61, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3d00c543, kafka_timestampType = create_time, kafka_receivedMessageKey = нуль, kafka_receivedPartitionId = 3, kafka_receivedTopic = kgh1231, kafka_receivedTimestamp = 1572640411870}]]

2019-11-01 16: 33: 31.907 DEBUG 35182 --- [kgh1231-0-C-1] essageListenerContainer $ ListenerConsumer: список фиксации: {kgh1231-0 = OffsetAndMetadata {offset = 82, metadata = ''}, kgh1231-2 = OffsetAndMetadata {offset = 62, metadata = ''}, kgh1231-1 = OffsetAndMetadata {offset = 62, metadata = ''}, kgh1231-4 = OffsetAndMetadata {offset = 62, metadata = ''},kgh1231-3 = OffsetAndMetadata {offset = 62, metadata = ''}}

2019-11-01 16: 33: 31.908 DEBUG 35182 --- [kgh1231-0-C-1] essageListenerContainer $ ListenerConsumer:Фиксация: {kgh1231-0 = OffsetAndMetadata {offset = 82, metadata = ''}, kgh1231-2 = OffsetAndMetadata {offset = 62, metadata = ''}, kgh1231-1 = OffsetAndMetadata {offset = 62, metadata = ''}, kgh1231-4 = OffsetAndMetadata {offset = 62, metadata = ''},kgh1231-3 = OffsetAndMetadata {offset = 62, metadata = ''}}

Если вы не видите ничего подобного, значит ваш потребитель не настроен должным образом.

Если выне могу понять, опубликовать свой журнал где-нибудь, как PasteBin.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...