Конфигурация производителя Kafka для приема данных - PullRequest
0 голосов
/ 08 мая 2018

В настоящее время я использую наш конвейер анализа данных на основе Kafka для загрузки огромных файлов журналов из исходной системы.Во время проглатывания я не использую какие-либо задержки / паузы в продюсере, я просто читаю из файлов журнала (содержащих JSON) и отправляю их Кафке.Мой продюсер имеет следующую конфигурацию.

metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [1.2.3.184:9092, 1.2.3.185:9092, 1.2.3.186:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 1

При такой конфигурации я теряю почти 1/3 сообщений со следующей ошибкой

7:27:14.053 [kafka-producer-network-thread | producer-1] ERROR com.abc.telemetry.service.KafkaService - Batch containing 39 record(s) expired due to timeout while requesting metadata from brokers for

Я планирую обновить linger.ms, batch.size для своего использования.дело.Есть ли что-нибудь еще, что я могу настроить, чтобы включить этот конвейер приема без потери данных?

Приветствия!

1 Ответ

0 голосов
/ 08 мая 2018

Я видел несколько вопросов об отправке огромных данных через Кафку. Поэтому я хотел бы попробовать это.

Кафка не предназначена для отправки огромных полезных нагрузок / сообщений. Вы должны видеть это как распределенную шину сообщений, которая дает вам все привилегии распределенной системы.

Kafka ограничивает размер сообщений, которые могут быть отправлены по следующим причинам

  • Огромные сообщения увеличивают нагрузку на память в брокере.
  • Большие сообщения замедляют работу брокера, и их обработка обходится очень дорого.

Решение:

  • Вы могли бы очень хорошо использовать Reference Based Messaging, куда вы отправляете расположение огромного сообщения для потребителя, а не отправка огромные данные как есть. Это позволит вам использовать возможности внешнее хранилище данных, а также снижает давление на Кафку Брокеры.
  • Вы также можете chunk данные и отправить их в линию и re-assemble в приемнике.
...