Связываете ли вы список классов с производителем кафки? - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть следующий класс:

case class Alpakka(id:Int,name:String,animal_type:String)

Я пытаюсь связать список этих классов дел с производителем в kafka, используя следующий код:

  def connectEntriesToProducer(seq: Seq[Alpakka]) = {


    val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

seq.map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))   
      .runWith(Producer.plainSink(producerSettings))
  }

Я использую circe для преобразования класса case в json. Однако я продолжаю получать сообщение об ошибке компилятора:

Error:(87, 34) type mismatch;
 found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
 required: org.apache.kafka.clients.producer.ProducerRecord[String,String] => ?
      .runWith(Producer.plainSink(producerSettings))

Я не уверен, что происходит!

1 Ответ

0 голосов
/ 09 апреля 2019

Вы пытаетесь построить Graph из Seq вместо Source.

Ваш метод connectEntriesToProducer должен выглядеть как

def connectEntriesToProducer(seq: Source[Alpakka]) = {

Примечание, Source вместо Seq.

В качестве альтернативы вы можете создать источник из Seq, но вам придется использовать immutable.Seq, поскольку Source.apply будет принимать только неизменяемый итератор.

def connectEntriesToProducer(seq: scala.collection.immutable.Seq[Alpakka]) = {
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

Source(seq).
  map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))
  .runWith(Producer.plainSink(producerSettings))
}
...