Как вы упомянули выше в своем ответе, когда когда-либо произойдет действие, потребуется некоторое время, чтобы вставить его в тонуть.sliding
можно улучшить следующим образом ...
Исходя из этого предположения, у меня есть 2 варианта для вас.Вы можете выбрать наиболее подходящий ...
Вариант № 1: (foreachPartitionAsync
: AsyncRDDActions ) Я бы предложил вам использоватьDataFrame
возможности группировки итераторов
df.repartition(numofpartitionsyouwant) // numPartitions
df.rdd.foreachPartitionAsync // since its partition wise processing to sink it would be faster than the approach you are adopting...
{
partitionIterator =>
partitionIterator.grouped(2000).foreach {
group => group.foreach {
// do your insertions here or what ever you wanted to ....
}
}
}
Примечание: СДР будет выполняться в фоновом режиме.Все эти исполнения будут отправлены в планировщик Spark и запущены одновременно.В зависимости от размера кластера Spark, некоторые задания могут ждать, пока исполнители не станут доступны для обработки.
Опция № 2:
Второй подходэто фрейм данных как randomSplit Я думаю, что вы можете использовать в этом случае для разделения фреймов данных одинакового размера.который вернет вам массив данных одинакового размера, если сумма их весов> 1. Примечание: веса (первый аргумент dataframe) для разбиений будут нормализованы, если они не равны 1.
DataFrame[] randomSplit (double [] weights) Произвольно разделяет этот DataFrame с предоставленными весами.
ссылается randomSplit
code здесь
это будет похоже..
val equalsizeddfArray = yourdf.randomSplit(Array(0.2,0.2,0.2,0.2,0.2,0.2, 0.3) // intentionally gave sum of weights > 1 (in your case 10000 records of dataframe to array of 5 dataframes of each 2000 records in it)
и затем ...
for (i <- 0 until equalsizeddfArray.length) {
// your logic ....
}
Примечание : вышеуказанная логика является последовательной ... Если вы хотите выполнитьих параллельно (если они независимы) вы можете использовать
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(yourtaskfuncOndf1(),yourtaskfuncOndf2()...,yourtaskfuncOndf10())), Duration(10, MINUTES))
Из вышеперечисленных 2 вариантов, я бы предпочел подход № 2, так как функция randomSplit будет приниматьзаботиться (путем нормализации весов) о делении равных размеров для их обработки