Разблокировка вызовов API с использованием искровой структурированной потоковой передачи - PullRequest
0 голосов
/ 03 мая 2020

У меня есть сценарий использования, когда я управляю наборами потоковых данных, создаю внешний API для обогащения набора данных и записываю его в приемник. Что я делаю до сих пор:

val simpleDS: Dataset[SimpleModel] = spark
.readstream
.format("kafka")
.option(..)..


def enrich(model: SimpleModel): EnrichedModel = {
    val fut: Future[Int] = lookupLabel(model.id)
    val enrich: Int = Await.result(fut, 5.seconds)

    EnrichModel(model.id, enrich)
}

val enrichedDS = dataset.map(enrich)

enrichedDS
.toJson
.writeStream
.format("kafka")  
.option(..)..

Хотя это работает, я не уверен насчет части Await.result, так как она блокируется. Тем не менее, future.onComplete, который не является блокирующим, похоже, заинтересован в побочном эффекте (Unit), а не в значении, возвращенном будущим (Int). Есть ли способ для меня использовать неблокирующий вызов, чтобы получить значение, возвращаемое Future?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...