Я использую kafka_2.11-2.1.1
и Производитель использует пружину 2.1.0.RELEASE.
Я использую Spring, когда я отправляю сообщения в тему Kafka, мой производитель генерирует много TimeoutExceptions
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
Я использую настройки производителя ниже kafka
acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
Я пробовал много комбинаций (особенно batchSize
& lingerMs
), но ничего не получалось. Любая помощь, пожалуйста, какие должны быть настройки для вышеуказанного сценария.
Попробовал еще раз с приведенными ниже конфигами ... но не повезло, та же ошибка
acks = 1
batch.size = 15
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120
retries = 1
Второй запуск:
Я пробовал разные комбинации, ничего не получалось.
Поэтому я подумал, что это будет проблема с сетью, SSL и т. Д.
Поэтому я установил и запустил Kafka на той же машине, на которой работает производитель, т.е. на моем локальном компьютере.
Я снова попытался запустить продюсера, указывая на локальные темы Кафки.
Но не повезло, тот же вопрос.
Ниже приведены параметры конфигурации.
2019-07-02 05:55:36.663 INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 0
bootstrap.servers = [localhost:9092]
request.timeout.ms = 60
retries = 1
buffer.memory = 33554432
linger.ms = 0
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
С той же ошибкой:
org.apache.kafka.common.errors.TimeoutException: срок действия 1 записей для inbound_topic - 1: 69 мс прошло с момента создания пакета плюс время ожидания
тоже пробовал
batch.size 5, 10 & 0
linger_ms 0, 5, 10 и т. д.
request_time_out 0, 45, 60, 120, 300 и т. д.
Ничего не работает ... та же ошибка.
Что еще мне следует попробовать, какое может быть решение?
Как избежать генерации отрицательного ключа
Да, я настроил локальную настройку и распечатал журнал с информацией о разделе, которая показана ниже
2019-07-03 02: 48: 28.822 INFO 7092 --- [lt-dispatcher-2] cscppCompanyInfoPartitioner: Тема: ключ inbound_topic = 597736248- Entropy Cayman Solar Ltd.-null-null-null Раздел = -1
2019-07-03 02: 48: 28.931 ОШИБКА 7092 --- [объявление | provider-1] osksupport.LoggingProducerListener: Возникает исключение при отправке сообщения с ключом = '597736248- Entropy Cayman Solar Ltd.-null-null-null' и payload='com.spgmi.ca.prescore.model.Company@8b12343 к теме inbound_topic:
org.apache.kafka.common.errors.TimeoutException: срок действия 1 записей для inbound_topic - 1: 104 мсек прошло с момента создания пакета плюс время ожидания
Мои темы inbound_topic имеют два раздела, как вы видите ниже
C: \ Software \ kafka \ kafka_2.11-2.1.1 \ bin \ windows> kafka-themes.bat --describe --zookeeper localhost: 2181 --topic inbound_topic
Тема: inbound_topic PartitionCount: 2 ReplicationFactor: 1 Конфиги:
Тема: inbound_topic Раздел: 0 Лидер: 0 Реплики: 0 Isr: 0
Тема: inbound_topic Раздел: 1 Лидер: 0 Реплики: 0 Isr: 0
Но мой продюсер, похоже, пытается отправить в Раздел = =.
Моя логика разбиения такая, как показано ниже
int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );
На ключе я делаю hashCode (). Что нужно исправить здесь, чтобы избежать отрицательного номера раздела? то есть раздел = -1
Какой должна быть логика ключа моего раздела?
любая помощь высоко ценится.