Как исправить kafka.common.errors.TimeoutException: Срок действия истек 1 запись (и), xxx мс прошло с момента создания пакета плюс время ожидания - PullRequest
0 голосов
/ 28 июня 2019

Я использую kafka_2.11-2.1.1 и Производитель использует пружину 2.1.0.RELEASE.

Я использую Spring, когда я отправляю сообщения в тему Kafka, мой производитель генерирует много TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time

Я использую настройки производителя ниже kafka

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60

Я пробовал много комбинаций (особенно batchSize & lingerMs), но ничего не получалось. Любая помощь, пожалуйста, какие должны быть настройки для вышеуказанного сценария.

Попробовал еще раз с приведенными ниже конфигами ... но не повезло, та же ошибка

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120
    retries = 1

Второй запуск:

Я пробовал разные комбинации, ничего не получалось. Поэтому я подумал, что это будет проблема с сетью, SSL и т. Д. Поэтому я установил и запустил Kafka на той же машине, на которой работает производитель, т.е. на моем локальном компьютере.

Я снова попытался запустить продюсера, указывая на локальные темы Кафки. Но не повезло, тот же вопрос.

Ниже приведены параметры конфигурации.

2019-07-02 05:55:36.663  INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 0
    bootstrap.servers = [localhost:9092]
    request.timeout.ms = 60
    retries = 1
    buffer.memory = 33554432
    linger.ms = 0
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null

С той же ошибкой: org.apache.kafka.common.errors.TimeoutException: срок действия 1 записей для inbound_topic - 1: 69 мс прошло с момента создания пакета плюс время ожидания

тоже пробовал batch.size 5, 10 & 0 linger_ms 0, 5, 10 и т. д. request_time_out 0, 45, 60, 120, 300 и т. д.

Ничего не работает ... та же ошибка.

Что еще мне следует попробовать, какое может быть решение?

Как избежать генерации отрицательного ключа

Да, я настроил локальную настройку и распечатал журнал с информацией о разделе, которая показана ниже

2019-07-03 02: 48: 28.822 INFO 7092 --- [lt-dispatcher-2] cscppCompanyInfoPartitioner: Тема: ключ inbound_topic = 597736248- Entropy Cayman Solar Ltd.-null-null-null Раздел = -1 2019-07-03 02: 48: 28.931 ОШИБКА 7092 --- [объявление | provider-1] osksupport.LoggingProducerListener: Возникает исключение при отправке сообщения с ключом = '597736248- Entropy Cayman Solar Ltd.-null-null-null' и payload='com.spgmi.ca.prescore.model.Company@8b12343 к теме inbound_topic:

org.apache.kafka.common.errors.TimeoutException: срок действия 1 записей для inbound_topic - 1: 104 мсек прошло с момента создания пакета плюс время ожидания

Мои темы inbound_topic имеют два раздела, как вы видите ниже C: \ Software \ kafka \ kafka_2.11-2.1.1 \ bin \ windows> kafka-themes.bat --describe --zookeeper localhost: 2181 --topic inbound_topic Тема: inbound_topic PartitionCount: 2 ReplicationFactor: 1 Конфиги: Тема: inbound_topic Раздел: 0 Лидер: 0 Реплики: 0 Isr: 0 Тема: inbound_topic Раздел: 1 Лидер: 0 Реплики: 0 Isr: 0

Но мой продюсер, похоже, пытается отправить в Раздел = =.

Моя логика разбиения такая, как показано ниже

int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
        logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );

На ключе я делаю hashCode (). Что нужно исправить здесь, чтобы избежать отрицательного номера раздела? то есть раздел = -1

Какой должна быть логика ключа моего раздела?

любая помощь высоко ценится.

Ответы [ 2 ]

1 голос
/ 28 июня 2019

Ошибка указывает, что некоторые записи помещаются в очередь с большей скоростью, чем они могут быть отправлены с клиента.

Когда ваш производитель отправляет сообщения, они сохраняются в буфере (перед отправкой их целевому посреднику), и записи группируются в пакеты для увеличения пропускной способности. Когда новая запись добавляется в пакет, она должна быть отправлена ​​в пределах окна -configurable- time, которое управляется request.timeout.ms (по умолчанию установлено значение 30 секунд). Если пакет находится в очереди в течение более длительного времени, выдается TimeoutException, и записи пакета затем удаляются из очереди и не доставляются посреднику.

Увеличение значения request.timeout.ms должно помочь вам.

Если это не работает, вы также можете попробовать уменьшить значение batch.size, чтобы пакеты отправлялись чаще (но на этот раз будет меньше сообщений), и убедитесь, что для linger.ms установлено значение 0 (по умолчанию) значение).

Обратите внимание, что вам нужно перезапустить брокеров kafka после изменения любого параметра конфигурации.

Если вы все еще получаете сообщение об ошибке, я предполагаю, что с вашей сетью что-то не так. Вы включили SSL?

0 голосов
/ 03 июля 2019

Я исправил предыдущую проблему, используя возвращаемый правильный номер раздела.

...