пишу кастомную свечу зажигания.В моем методе addBatch
я использую ForEachPartitionAsync
, который, если я не ошибаюсь, заставляет драйвер работать асинхронно, возвращая будущее.
val work: FutureAction[Unit] = rdd.foreachPartitionAsync { rows =>
val sourceInfo: StreamSourceInfo = serializeRowsAsInputStream(schema, rows)
val ackIngestion = Future {
ingestRows(sourceInfo) } andThen {
case Success(ingestion) => ackIngestionDone(partitionId, ingestion)
}
Await.result(ackIngestion, timeOut) // I would like to remove this line..
}
work onSuccess {
case _ => // move data from temporary table, report success of all workers
}
work onFailure{
//delete tmp data
case t => throw t.getCause
}
Я не могу найти способ запустить рабочие узлы, не блокируя вызов Await, как будто я удаляю их, об успешном сообщается объекту work
future, хотя будущее недействительно закончить.
Есть ли способ сообщить водителю, что все рабочие завершили свои асинхронные задания?
Примечание. Я посмотрел на функцию foreachPartitionAsync
иу него есть только одна реализация, которая ожидает функцию, которая возвращает модуль (я бы ожидал, что другая будет возвращать будущее или, может быть, CountDownLatch ..)