Producer.send не работает внутри .map - PullRequest
0 голосов
/ 19 октября 2018

Я делаю приложение, которое выбирает данные изasticsearch и отправляет их в kafka.Но функция factory.send () не работает внутри карты, однако за ее пределами все работает идеально

val f1 = ElasticsearchSource
  .create(
    indexName = "products",
    typeName = "product",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[spray.json.JsObject] =>
    val product = message.source
    producer.send(new ProducerRecord("test", product))
    println("publishing message ")
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(Sink.seq)

Что может быть причиной этого?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...