Как эффективно отправлять сообщения из Seq [String] в тему Kafka - PullRequest
0 голосов
/ 16 октября 2019

В моем потоковом приложении Scala (2.11) я потребляю данные из одной очереди в IBM MQ и записываю их в раздел Kafka с одним разделом. После использования данных из MQ полезная нагрузка сообщения разделяется на 3000 меньших сообщений, которые хранятся в последовательности строк. Затем каждое из этих 3000 сообщений отправляется в Kafka (версия 2.x) с помощью KafkaProducer.

Как бы вы отправили эти 3000 сообщений?

Я не могу увеличить количество очередей в IBMMQ (не под моим контролем), а также количество разделов в теме (требуется упорядочение сообщений, а написание пользовательского разделителя повлияет на слишком много потребителей темы).

В данный момент настройки источника:

  • acks = 1
  • linger.ms = 0
  • batch.size = 65536

Но их оптимизация, вероятно, является вопросомЭто не моя часть моей нынешней проблемы.

В настоящее время я просто занимаюсь

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
    val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
    val recordMetadata = future.get()
}

И для меня это выглядит не самым элегантным и наиболее эффективным способом. Есть ли программный способ увеличить пропускную способность без изменения конфигурации / переключения на Spark Streaming и т. Д., А просто путем написания лучшего кода Scala? Может быть, Threads помогут?

Я новичок в Scala, так что пока не очень знаком с оптимизацией производительности и параллелизмом.


редактировать после ответа @ radai

Спасибок ответу, указывающему мне правильное направление, я более подробно рассмотрел различные методы продюсера. В книге Kafka - The Definitive Guide (O'Reilly) перечислены эти методы:

Запусти и забудь Мы отправляем сообщение на сервер, и нам все равно,это прибывает успешно или нет. Большую часть времени он будет доставлен успешно, так как Kafka является высокодоступным, и производитель автоматически повторяет отправку сообщений. Однако некоторые сообщения будут потеряны при использовании этого метода.

Синхронная отправка Мы отправляем сообщение, метод send () возвращает объект Future, и мы используем get () для ожиданияв будущем и посмотрим, была ли успешной отправка () или нет.

Асинхронная отправка Мы вызываем метод send () с помощью функции обратного вызова, которая запускается при получении ответа от Kafka. broker

И теперь мой код выглядит следующим образом (без учета обработки ошибок и простого определения класса Callback):

  val asyncProducer = new KafkaProducer[String, String](someProperties)

  for (msg <- messages) {
    val record = new ProducerRecord[String, String](someTopic, someKey, msg)
    asyncProducer.send(record, new compareProducerCallback)
  }
  asyncProducer.flush()

Я сравнил все методы для 10000 очень маленькихСообщения. Вот мой результат измерения:

  1. Запустить и забыть: 173683464ns

  2. Синхронная отправка: 29195039875ns

  3. Асинхронная отправка: 44153826ns

Честно говоря, вероятно, есть больший потенциал для оптимизации всех их путем выбора правильных свойств (batch.size, linger.ms, ... ). Тем не менее, это уже отражает потенциал, я думаю. Спасибо за помощь @radai!

1 Ответ

0 голосов
/ 17 октября 2019

Самая большая причина, по которой я вижу, что ваш код работает медленно, заключается в том, что вы ожидаете каждого будущего отправки.

Кафка была разработана для отправки пакетов. отправляя одну запись за раз, вы ожидаете двустороннего приема для каждой отдельной записи, и вы не получаете никакой выгоды от сжатия.

«идиоматическая» вещь, которую нужно сделать, это отправить все, а затем заблокировать всерезультирующие фьючерсы во 2-м цикле.

также, если вы намереваетесь это сделать, я бы поднялся и задержался (в противном случае ваша 1-ая запись приведет к партии первого размера, что замедлит вас в целом. см. https://en.wikipedia.org/wiki/Nagle%27s_algorithm) и вызовите flush () на производителе, как только ваш цикл отправки завершится.

...