Вот пример того, как создавать сообщения для Kafka в Scala:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
val kafkaProducerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "x.data.edh:6667")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props
}
val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer.send(new ProducerRecord[String, String]("myTopic", keyMessage._1, keyMessage._2))
Если вы хотите выполнять потоковую передачу, я рекомендую взглянуть на Руководство по интеграции Spark + Kafka .
Обратите внимание, что приведенный выше пример - это KafkaProducer в режиме fire-and-Forgot . Существуют следующие способы использования Kafka Producer:
Огонь и забывай Мы отправляем сообщение на сервер, и нам все равно, будет ли оно успешно доставлено или не. Большую часть времени он будет доставлен успешно, так как Kafka является высокодоступным, и производитель автоматически повторяет отправку сообщений. Однако некоторые сообщения будут потеряны при использовании этого метода.
Синхронная отправка Мы отправляем сообщение, метод send()
возвращает объект Future, и мы используем get()
для ожидания в будущем и посмотрим, был ли send()
успешным или нет.
Асинхронная отправка Мы вызываем метод send()
с функцией callback
, которая запускается при получении ответа от брокер Kafka
Синхронный пример
producer.send(record).get()
Асинхронный пример
producer.send(record, new compareProducerCallback)
producer.flush()
// Callback trait only contains the one abstract method onCompletion
private class compareProducerCallback extends Callback {
@Override
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exception.printStackTrace()
}
}
}