Scala: невозможно отправить сообщение Kafka (размещено на удаленном сервере) - PullRequest
0 голосов
/ 06 марта 2019

Я использую Scala 2.12 и мне требуются библиотеки для преобразования сообщений в Avro (необходимо преобразовать) и в клиенты kafka.

Я выполняю код на хосте Linux (dev), на котором запущено другое приложение (Apache NiFi), и я могу создать KafkaProducer и опубликовать сообщение на удаленном Kafka.

Поскольку сейчас это dev, протокол PLAINTEXT.

например. конфига KafkaProducer в Нифи.

acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Кроме того, NiFi запускается с опцией java для использования файла JAAS, содержимое которого:

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   principal="myUserName@myRealm"
   useKeyTab=true
   client=true
   keyTab="/path/myfile.keytab"
   serviceName="kafka";
};

Также доступен файл krb5.conf, который используется.

Используя вышеуказанный конфиг, NiFi может создавать KafkaProducer и отправлять сообщения через него.

Теперь я использую то же самое с кодом Scala. Простой класс, который использует следующие build.sbt и код для отправки сообщения.

build.sbt:

// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"

fork in run := true

javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"

Мой код для отправки сообщения. Удалены нежелательные строки для краткости. Обратите внимание, что тестирование создания данных в Avro выполняется нормально. То же сообщение, когда оно дается NiFi, оно может правильно опубликовать в теме. Что не работает, так это публикация на kafka с использованием Scala.

код:

package example

import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.{Properties, UUID}
import org.apache.avro.Schema.Parser

import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.io.Source
import scala.io.StdIn


object Hello extends Greeting with App {

  // case classes for creating avro record
  // This part works fine.

  val schemaFile = "/path/Schema.avsc"

  val schema = new Schema.Parser().parse(new File(schemaFile))

  val reader = new GenericDatumReader[GenericRecord](schema)

  val avroRecord = new GenericData.Record(schema)
  // populate correctly the record.
  // works fine.

  val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
  val topic = "myTopic"
  private def configuration: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
    props.put("security.protocol", "PLAINTEXT")
    props.put("sasl.kerberos.service.name", "kafka")
    props.put("acks", "all")
    props.put("retries","0")
    props
  }


  val producer = new KafkaProducer[String, Array[Byte]](configuration)
  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get.binaryEncoder(out, null)
  writer.write(avroRecord, encoder)
  encoder.flush()
  out.close()
  val serializedBytes: Array[Byte] = out.toByteArray()

  val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
  producer.send(recordToSend)


}

trait Greeting {
  lazy val greeting: String = "hello"
}

Когда я запускаю его в командной строке sbt:

свт чистый

sbt compile

сб. Беги

Я получаю следующую ошибку / вывод. Ничего не опубликовано.

Выход:

-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn]   For better performance, hit [ENTER] to switch to interactive mode, or
[warn]   consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info] 
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error]         acks = 1
[error]         batch.size = 16384
[error]         bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error]         buffer.memory = 33554432
[error]         client.dns.lookup = default
[error]         client.id =
[error]         compression.type = none
[error]         connections.max.idle.ms = 540000
[error]         delivery.timeout.ms = 120000
[error]         enable.idempotence = false
[error]         interceptor.classes = []
[error]         key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error]         linger.ms = 0
[error]         max.block.ms = 60000
[error]         max.in.flight.requests.per.connection = 5
[error]         max.request.size = 1048576
[error]         metadata.max.age.ms = 300000
[error]         metric.reporters = []
[error]         metrics.num.samples = 2
[error]         metrics.recording.level = INFO
[error]         metrics.sample.window.ms = 30000
[error]         partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error]         receive.buffer.bytes = 32768
[error]         reconnect.backoff.max.ms = 1000
[error]         reconnect.backoff.ms = 50
[error]         request.timeout.ms = 30000
[error]         retries = 0
[error]         retry.backoff.ms = 100
[error]         sasl.client.callback.handler.class = null
[error]         sasl.jaas.config = null
[error]         sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error]         sasl.kerberos.min.time.before.relogin = 60000
[error]         sasl.kerberos.service.name = kafka
[error]         sasl.kerberos.ticket.renew.jitter = 0.05
[error]         sasl.kerberos.ticket.renew.window.factor = 0.8
[error]         sasl.login.callback.handler.class = null
[error]         sasl.login.class = null
[error]         sasl.login.refresh.buffer.seconds = 300
[error]         sasl.login.refresh.min.period.seconds = 60
[error]         sasl.login.refresh.window.factor = 0.8
[error]         sasl.login.refresh.window.jitter = 0.05
[error]         sasl.mechanism = GSSAPI
[error]         security.protocol = PLAINTEXT
[error]         send.buffer.bytes = 131072
[error]         ssl.cipher.suites = null
[error]         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error]         ssl.endpoint.identification.algorithm =
[error]         ssl.key.password = null
[error]         ssl.keymanager.algorithm = SunX509
[error]         ssl.keystore.location = null
[error]         ssl.keystore.password = null
[error]         ssl.keystore.type = JKS
[error]         ssl.protocol = TLS
[error]         ssl.provider = null
[error]         ssl.secure.random.implementation = null
[error]         ssl.trustmanager.algorithm = PKIX
[error]         ssl.truststore.location = null
[error]         ssl.truststore.password = null
[error]         ssl.truststore.type = JKS
[error]         transaction.timeout.ms = 60000
[error]         transactional.id = null
[error]         value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM

Я уверен, что это связано с безопасностью или Kerberos. Но другие приложения могут отправлять сообщения, не используя мой код scala.

UPDATE

Основываясь на ответе @tgrez, я попытался заблокировать с помощью Future get.

 //producer.send(recordToSend)
    val metaF: Future[RecordMetadata] = producer.send(recordToSend)
    val meta = metaF.get() //blocking
    val msgLog =
    s"""
       |offset = ${meta.offset()}
       |partition = ${meta.partition()}
       |topic = ${meta.topic()}
     """.stripMargin
    println(msgLog)
    producer.close()

Однако я все еще похожая ошибка.

[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error]         acks = 1
[error]         batch.size = 16384
[error]         bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error]         buffer.memory = 33554432
[error]         client.dns.lookup = default
[error]         client.id =
[error]         compression.type = none
[error]         connections.max.idle.ms = 540000
[error]         delivery.timeout.ms = 120000
[error]         enable.idempotence = false
[error]         interceptor.classes = []
[error]         key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error]         linger.ms = 0
[error]         max.block.ms = 60000
[error]         max.in.flight.requests.per.connection = 5
[error]         max.request.size = 1048576
[error]         metadata.max.age.ms = 300000
[error]         metric.reporters = []
[error]         metrics.num.samples = 2
[error]         metrics.recording.level = INFO
[error]         metrics.sample.window.ms = 30000
[error]         partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error]         receive.buffer.bytes = 32768
[error]         reconnect.backoff.max.ms = 1000
[error]         reconnect.backoff.ms = 50
[error]         request.timeout.ms = 30000
[error]         retries = 0
[error]         retry.backoff.ms = 100
[error]         sasl.client.callback.handler.class = null
[error]         sasl.jaas.config = null
[error]         sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error]         sasl.kerberos.min.time.before.relogin = 60000
[error]         sasl.kerberos.service.name = kafka
[error]         sasl.kerberos.ticket.renew.jitter = 0.05
[error]         sasl.kerberos.ticket.renew.window.factor = 0.8
[error]         sasl.login.callback.handler.class = null
[error]         sasl.login.class = null
[error]         sasl.login.refresh.buffer.seconds = 300
[error]         sasl.login.refresh.min.period.seconds = 60
[error]         sasl.login.refresh.window.factor = 0.8
[error]         sasl.login.refresh.window.jitter = 0.05
[error]         sasl.mechanism = GSSAPI
[error]         security.protocol = PLAINTEXT
[error]         send.buffer.bytes = 131072
[error]         ssl.cipher.suites = null
[error]         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error]         ssl.endpoint.identification.algorithm =
[error]         ssl.key.password = null
[error]         ssl.keymanager.algorithm = SunX509
[error]         ssl.keystore.location = null
[error]         ssl.keystore.password = null
[error]         ssl.keystore.type = JKS
[error]         ssl.protocol = TLS
[error]         ssl.provider = null
[error]         ssl.secure.random.implementation = null
[error]         ssl.trustmanager.algorithm = PKIX
[error]         ssl.truststore.location = null
[error]         ssl.truststore.password = null
[error]         ssl.truststore.type = JKS
[error]         transaction.timeout.ms = 60000
[error]         transactional.id = null
[error]         value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM

Что-то, чего мне здесь не хватает?

ОБНОВЛЕНИЕ 2:

Как упоминалось ниже, я изменил свой код. однако это тоже не работает. Я понял, что что-то не так в сериализации.

У меня уже есть avroRecord в формате GenericData.Record. Разве я не могу использовать то же самое для публикации данных в Kafka? Почему я должен использовать массив байтов или любой другой сериализатор для того же?

Единственный пример, который я нашел, - это использование io.confluent avro serializer. Но я не могу использовать это, поскольку sbt или maven не могут загрузить его сейчас. Infact URL: http://packages.confluent.io/maven/ не работает. Каким-то образом я скачал банки и использовал их в качестве внешних библиотек.

Изменен код на:

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")

val producer = new KafkaProducer[String, GenericData.Record](configuration)

val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)

Теперь все работает нормально.

Однако я все еще ищу любой другой класс сериализатора (который доступен в Maven) для отправки сообщения как GenericData вместо массива байтов.

ОБНОВЛЕНИЕ 3:

В соответствии с предложением пользователя @KZapagol, я попытался использовать то же самое и получил следующую ошибку.

Схема: (Это сложно, поэтому нужна помощь, если я правильно преобразовываю данные)

{"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": [{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"messageId","type":"string"},{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":[ "null", "string" ]},{"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis"},{"name":"sentBy","type":"string"},{"name":"sentTo","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"recordOffset","type":[ "null", "string" ]}]}},{"name":"pnlData","type":{"type":"record","name":"pnlData","fields":[{"name":"pnlHeader","type":{"type":"record","name":"pnlData","namespace":"pnlHeader","fields":[{"name":"granularity","type":"string"},{"name":"pnlType","type":"string"},{"name":"pnlSubType","type":"string"},{"name":"businessDate","type":"string","logicalType": "date"},{"name":"bookId","type":"string"},{"name":"bookDescription","type":"string"},{"name":"pnlStatus","type":"string"}]}},{"name":"pnlBreakDown","type":{"type":"array","items":{"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":[{"name":"category","type":[ "null", "string" ]},{"name":"subCategory","type":[ "null", "string" ]},{"name":"riskCategory","type":[ "null", "string" ]},{"name":"pnlCurrency","type":"string"},{"name":"pnlDetails", "type":{"type":"array","items": {"type":"record","name":"pnlData","namespace":"pnlDetails","fields":[{"name":"pnlLocalAmount","type":"double"},{"name":"pnlCDEAmount","type":"double"}]}}}]}}}]}}]}

У меня есть соответствующие классы случаев для выше. (Пожалуйста, предложите, если я что-то здесь пропустил?)

case class MessageHeader( messageId: String,
                   businessId: String,
                   batchId: String,
                   sourceSystem: String,
                   secondarySourceSystem: String,
                   sourceSystemCreationTimestamp: Long,
                   sentBy: String,
                   sentTo: String,
                   messageType: String,
                   schemaVersion: String,
                   processing: String,
                   recordOffset: String
                 )

case class PnlHeader (  granularity: String,
                        pnlType: String,
                        pnlSubType: String,
                        businessDate: String,
                        bookId: String,
                        bookDescription: String,
                        pnlStatus: String
                       )

case class PnlDetails (  pnlLocalAmount: Double,
                         pnlCDEAmount: Double
                        )

case class PnlBreakdown (  category: String,
                           subCategory: String,
                           riskCategory: String,
                           pnlCurrency: String,
                           pnlDetails: List[PnlDetails]
                          )

case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )

case class PnlRecord (header: MessageHeader, pnlData: PnlData )

Я смоделировал свои данные в вышеуказанном формате PnlRecord. У меня есть список таких записей.

Из списка таких записей я перебираю и пытаюсь опубликовать его в Kafka.

 // Create Producer
    val producer = new KafkaProducer[String, Array[Byte]](properties)

 // This filename is file where above schema is saved.
    val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
    val avroMessage = new AvroMessage(avroJsonSchema)
    val avroRecord = new Record(avroMessage.schema)

// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend) {
      avroRecord.put("header", record.header)
      avroRecord.put("pnlData", record.pnlData)
      //logger.info(s"Record: ${avroRecord}\n")
      avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
      avroMessage.dfw.append(avroRecord)
      avroMessage.dfw.close()
      val bytes = avroMessage.baos.toByteArray

      // send data
      producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)

      //flush data
      producer.flush()
      //flush and close producer
      producer.close()
    }

Класс AvroMessage (по предложению пользователя)

import java.io.ByteArrayOutputStream

import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}


class AvroMessage(avroJsonSchema: String) {

  val parser = new Schema.Parser()
  val schema = parser.parse(avroJsonSchema)
  val baos = new ByteArrayOutputStream()
  val gdw = new GenericDatumWriter[GenericRecord](schema)
  val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
  val compressionLevel = 5
  dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
  dfw.create(schema, baos)

}

Я получаю сообщение об ошибке ниже:

2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
        at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
        at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)

Правильны ли мои исходные классы по схеме?

Мой класс дел MessageHeader показан выше.

Моя схема показана выше (обновлено).

Моя запись:

Record: {"header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))}

Ответы [ 2 ]

0 голосов
/ 07 марта 2019

Пожалуйста, обновите ваш код, как показано ниже, и попробуйте один раз.Похоже, вы не закрыли выходной поток, кодировщик и производитель должным образом.

val producer = new KafkaProducer[String, Array[Byte]](configuration)
  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get.binaryEncoder(out, null)
  writer.write(avroRecord, encoder)

  val serializedBytes: Array[Byte] = out.toByteArray()

  encoder.flush()
  out.close()


  val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
  producer.send(recordToSend,new ProducerCallback)

  //flush data
  producer.flush()
  //flush and close producer
  producer.close()



class ProducerCallback(implicit logger: Logger) extends Callback {

  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
    //executes every time a record is successfully sent or exception thrown
    Option(metadata) match {
      case Some(_) =>
        logger.info("Received new metadata. \n" +
          "Topic: " + metadata.topic() + "\n" +
          "Partition: " + metadata.partition() + "\n" +
          "Offset: " + metadata.offset() + "\n" +
          "Timestamp: " + metadata.timestamp() + "\n" +
          "Checksum: " + metadata.checksum())
      case None => ;
    }
    Option(exception) match {
      case Some(_) =>
        logger.error("Exception thrown during processing of record... " + exception)
        throw exception
      case None => ;
    }
  }
}

Пожалуйста, обратитесь к ссылке https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache для получения дополнительных примеров производителей и потребителей kafka.Надеюсь, это поможет!

Обновление

Я добавил пример KafkaProducer для ввода Avroschema.Пожалуйста, обратитесь https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala.

Я использовал apache avro jar и образец файла avsc, как показано ниже.Пожалуйста, измените файл схемы в соответствии с вашими требованиями. И я могу успешно произвести запись.

{
   "type": "record",
   "name": "employee",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "id", "type": "int"},
      {"name": "mobileNumber", "type": ["string", "null"]},
      {"name": "salary", "type": ["int", "null"]}
  ]
}
0 голосов
/ 06 марта 2019

Это может быть проще, чем кажется.Метод send является асинхронным, он возвращает Future<RecordMetadata>.Ваш пример завершается до того, как сообщение действительно отправлено.

Производитель Kafka пакетирует сообщения в фоновом режиме, поэтому для обеспечения отправки сообщений вам следует либо заблокировать, например, Future.get (это означает ожидание ответа брокера с помощьюметаданных) или убедитесь, что буферы сброшены с kafkaProducer.flush().

. В тестах я рекомендую блокировать на Future.

...