Приложение My Spark выглядит следующим образом:
1) выполнить большой запрос с помощью Spark SQL в фрейме данных «dataDF»
2) foreach-раздел, участвующий в «dataDF»:
2.1) получить связанный «отфильтрованный» фрейм данных, чтобы иметь только данные, связанные с разделом
2.2) выполнить определенную работу с этим «отфильтрованным» фреймом данных и записать вывод
Код выглядит следующим образом:
val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")
for {
row <- dataDF.dropDuplicates("partition").collect
} yield {
val partition_str : String = row.getAs[String](0)
val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )
// ... on each partition, do work depending on the partition, and write result on HDFS
// Example :
if( partition_str == "category_A" ){
// do group by, do pivot, do mean, ...
val x = filtered
.groupBy("column1","column2")
...
// write final DF
x.write.parquet("some/path")
} else if( partition_str == "category_B" ) {
// select specific field and apply calculation on it
val y = filtered.select(...)
// write final DF
x.write.parquet("some/path")
} else if ( ... ) {
// other kind of calculation
// write results
} else {
// other kind of calculation
// write results
}
}
Такой алгоритм успешно работает.SQL-запрос Spark полностью распределен.Однако конкретная работа, выполняемая с каждым результирующим разделом, выполняется последовательно, и результат является неэффективным, особенно потому, что каждая запись, связанная с разделом, выполняется последовательно.
В таком случае, какие способы заменить «на выход»"чем-то параллельно / асинхронно?
Спасибо