У меня есть сценарий использования, когда я управляю наборами потоковых данных, создаю внешний API для обогащения набора данных и записываю его в приемник. Что я делаю до сих пор:
val simpleDS: Dataset[SimpleModel] = spark
.readstream
.format("kafka")
.option(..)..
def enrich(model: SimpleModel): EnrichedModel = {
val fut: Future[Int] = lookupLabel(model.id)
val enrich: Int = Await.result(fut, 5.seconds)
EnrichModel(model.id, enrich)
}
val enrichedDS = dataset.map(enrich)
enrichedDS
.toJson
.writeStream
.format("kafka")
.option(..)..
Хотя это работает, я не уверен насчет части Await.result
, так как она блокируется. Тем не менее, future.onComplete
, который не является блокирующим, похоже, заинтересован в побочном эффекте (Unit
), а не в значении, возвращенном будущим (Int
). Есть ли способ для меня использовать неблокирующий вызов, чтобы получить значение, возвращаемое Future
?