Camel-Kafka ConsumerCount падает до 1 (по умолчанию) от определенного значения - PullRequest
0 голосов
/ 24 сентября 2019

Я использую потребитель kafka в нашем приложении, интегрированном с Camel (Camel-Kafka).Мы потребляем сообщения и отправляем данные на сервер для обработки.Существует одна тема «XYZ», определенная с 30 разделами, и я определил 15 как количество потребителей на каждом узле-потребителе (всего 2 экземпляра) '

Конфигурация CamelConsumer

kafka.consumersCount=15
kafka.consumerStreams=15

Из журналов видно, что при запуске потребителя существует 15 потоков потребителя (скажем, на 1 узле), что соответствует настройке.

INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

Если сервер перестает отвечать из-за проблем в сети илилюбой другой сценарий, когда сервер недоступен.Все потребители кафки начинают отписываться, что опять-таки является ожидаемым поведением (пока что это хорошо)

Примечание: Мы также определили Camel ThrottlingExceptionRoutePolicy, который выполняет проверку работоспособности на сервере передотправка потребленного сообщения.

Как только сервер вернулся и стал доступен, я вижу, что активны не все 15 потоков пользователей, а только 1. Из журналов ниже я вижу, что он получает подписку и отписывается один за другим.и, наконец, приложение работает только с одним счетчиком.

INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ListOfDefinedServers]
check.crcs = true
client.id = 
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = XYZ-GroupId-12345
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 5000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator servername (id: 2147482644 rack: null)
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously assigned partitions []
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with generation 1
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6]

Любая идея, почему верблюд не работает с определенным счетчиком, скорее работает только с одним потребителем.Это влияет на скорость обработки приложения-потребителя, когда нагрузка больше.

Любая помощь или предложение, что здесь происходит не так?

Я ожидаю, что после сбоя будет запущено 15 потоков пользователя, как это определенов конфиге верблюжьей кафки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...