В моем потоковом приложении Scala (2.11) я потребляю данные из одной очереди в IBM MQ и записываю их в раздел Kafka с одним разделом. После использования данных из MQ полезная нагрузка сообщения разделяется на 3000 меньших сообщений, которые хранятся в последовательности строк. Затем каждое из этих 3000 сообщений отправляется в Kafka (версия 2.x) с помощью KafkaProducer.
Как бы вы отправили эти 3000 сообщений?
Я не могу увеличить количество очередей в IBMMQ (не под моим контролем), а также количество разделов в теме (требуется упорядочение сообщений, а написание пользовательского разделителя повлияет на слишком много потребителей темы).
В данный момент настройки источника:
- acks = 1
- linger.ms = 0
- batch.size = 65536
Но их оптимизация, вероятно, является вопросомЭто не моя часть моей нынешней проблемы.
В настоящее время я просто занимаюсь
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
val recordMetadata = future.get()
}
И для меня это выглядит не самым элегантным и наиболее эффективным способом. Есть ли программный способ увеличить пропускную способность без изменения конфигурации / переключения на Spark Streaming и т. Д., А просто путем написания лучшего кода Scala? Может быть, Threads помогут?
Я новичок в Scala, так что пока не очень знаком с оптимизацией производительности и параллелизмом.
редактировать после ответа @ radai
Спасибок ответу, указывающему мне правильное направление, я более подробно рассмотрел различные методы продюсера. В книге Kafka - The Definitive Guide (O'Reilly) перечислены эти методы:
Запусти и забудь Мы отправляем сообщение на сервер, и нам все равно,это прибывает успешно или нет. Большую часть времени он будет доставлен успешно, так как Kafka является высокодоступным, и производитель автоматически повторяет отправку сообщений. Однако некоторые сообщения будут потеряны при использовании этого метода.
Синхронная отправка Мы отправляем сообщение, метод send () возвращает объект Future, и мы используем get () для ожиданияв будущем и посмотрим, была ли успешной отправка () или нет.
Асинхронная отправка Мы вызываем метод send () с помощью функции обратного вызова, которая запускается при получении ответа от Kafka. broker
И теперь мой код выглядит следующим образом (без учета обработки ошибок и простого определения класса Callback):
val asyncProducer = new KafkaProducer[String, String](someProperties)
for (msg <- messages) {
val record = new ProducerRecord[String, String](someTopic, someKey, msg)
asyncProducer.send(record, new compareProducerCallback)
}
asyncProducer.flush()
Я сравнил все методы для 10000 очень маленькихСообщения. Вот мой результат измерения:
Запустить и забыть: 173683464ns
Синхронная отправка: 29195039875ns
Асинхронная отправка: 44153826ns
Честно говоря, вероятно, есть больший потенциал для оптимизации всех их путем выбора правильных свойств (batch.size, linger.ms, ... ). Тем не менее, это уже отражает потенциал, я думаю. Спасибо за помощь @radai!