Как написать Kafka Producer в Scala - PullRequest
0 голосов
/ 18 апреля 2020

Мне нужна помощь в публикации сообщения для topi c с использованием kafka Manufacturer. Мой клиент-производитель kafka написан на scala, работает поверх спарка.

Моя работа успешно выполняется, но, похоже, мое сообщение не опубликовано.

Вот код

val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))

Значения конфигурации производителя

metric.reporters = []
    metadata.max.age.ms = 300000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
    ssl.keystore.type = JKS
    sasl.mechanism = GSSAPI
    max.block.ms = 60000
    interceptor.classes = null
    ssl.truststore.password = null
    client.id = 
    ssl.endpoint.identification.algorithm = null
    request.timeout.ms = 30000
    acks = 1
    receive.buffer.bytes = 32768
    ssl.truststore.type = JKS
    retries = 0
    ssl.truststore.location = null
    ssl.keystore.password = null
    send.buffer.bytes = 131072
    compression.type = none
    metadata.fetch.timeout.ms = 60000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    batch.size = 16384
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    linger.ms = 0

Как отладить проблему?

1 Ответ

1 голос
/ 18 апреля 2020

Вот пример того, как создавать сообщения для Kafka в Scala:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

val kafkaProducerProps: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "x.data.edh:6667")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    props
  }

val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer.send(new ProducerRecord[String, String]("myTopic", keyMessage._1, keyMessage._2))

Если вы хотите выполнять потоковую передачу, я рекомендую взглянуть на Руководство по интеграции Spark + Kafka .

Обратите внимание, что приведенный выше пример - это KafkaProducer в режиме fire-and-Forgot . Существуют следующие способы использования Kafka Producer:

Огонь и забывай Мы отправляем сообщение на сервер, и нам все равно, будет ли оно успешно доставлено или не. Большую часть времени он будет доставлен успешно, так как Kafka является высокодоступным, и производитель автоматически повторяет отправку сообщений. Однако некоторые сообщения будут потеряны при использовании этого метода.

Синхронная отправка Мы отправляем сообщение, метод send() возвращает объект Future, и мы используем get() для ожидания в будущем и посмотрим, был ли send() успешным или нет.

Асинхронная отправка Мы вызываем метод send() с функцией callback, которая запускается при получении ответа от брокер Kafka

Синхронный пример

producer.send(record).get()

Асинхронный пример

producer.send(record, new compareProducerCallback)
producer.flush()

// Callback trait only contains the one abstract method onCompletion
private class compareProducerCallback extends Callback {
  @Override
  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
    if (exception != null) {
      exception.printStackTrace()
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...