Использование futures в Spark, вероятно, не рекомендуется, за исключением особых случаев, и простое распараллеливание вычислений не входит в их число (вполне возможно, что для блокировки ввода-вывода (например, для выполнения запросов к стороннему сервису) используется неблокирующая оболочка).единственный особый случай).
Обратите внимание, что Future
не гарантирует параллелизм (зависит от того, как и как они выполняются параллельно, зависит от ExecutionContext
, в котором они выполняются), только асинхронность.Кроме того, в случае, если вы порождаете выполняющее вычисления будущее в преобразовании Spark (т. Е. На исполнителе, а не на драйвере), есть вероятность, что улучшения производительности не будет, так как Spark обычно выполняет хорошую работуподдерживая занятость ядер на исполнителях, все, что порождает эти фьючерсы, борется за ядра со Spark.
В общем, будьте очень внимательны при объединении абстракций параллелизма, таких как Spark RDD / DStreams / Dataframes, актеры и фьючерсы: естьмножество потенциальных минных полей, где такие комбинации могут нарушать гарантии и / или соглашения в различных компонентах.
Стоит также отметить, что у Spark есть требования в отношении сериализуемости промежуточных значений и что фьючерсы, как правило, не сериализуемы, поэтомуСтадия искры не может привести к будущему;это означает, что в принципе у вас нет выбора, кроме Await
для фьючерсов, порожденных на стадии.
Если вы все еще хотите порождать фьючерсы на стадии Spark (например, отправлять их в веб-службу), возможно, этоЛучше всего использовать Future.sequence
, чтобы свернуть фьючерсы в один, а затем Await
на этом (обратите внимание, что я не проверял эту идею: я предполагаю, что существует неявное CanBuildFrom[Iterator[Future[String]], String, Future[String]]
доступное):
def postString(s: String): Future[Unit] = ???
def postStringRDD(rdd: RDD[String]): RDD[String] = {
rdd.mapPartitions { strings =>
// since this is only get used for combining the futures in the Await, it's probably OK to use the implicit global execution context here
implicit val ectx = ???
Await.result(strings.map(postString))
}
rdd // Pass through the original RDD
}