Я делаю приложение, которое выбирает данные изasticsearch и отправляет их в kafka.Но функция factory.send () не работает внутри карты, однако за ее пределами все работает идеально
val f1 = ElasticsearchSource
.create(
indexName = "products",
typeName = "product",
query = """{"match_all": {}}"""
)
.map { message: OutgoingMessage[spray.json.JsObject] =>
val product = message.source
producer.send(new ProducerRecord("test", product))
println("publishing message ")
IncomingMessage(Some(message.id), message.source)
}
.runWith(Sink.seq)
Что может быть причиной этого?