Ваша реализация уже распараллелена, так что ура!
Чтобы добавить больше деталей:
foreach
в искре - это action
, который используется для выполнения некоторых операций с побочными эффектами. Он работает на RDD в JVM исполнителя, если в режиме кластера работает spark.
Если вы хотите избавиться от foreach
все вместе, тогда вы можете перевести его в UDF и вызвать его. Тем не менее, это не очень хорошая практика, потому что, основываясь на вашем примере, вы не надеетесь получить какой-либо результат от REST API. Осторожно: впереди уродство
import org.apache.spark.sql.functions.udf
val inputDF = spark.read.format("csv").option("header", true).option("inferSchema", true).load(getClass.getResource("/FifaData.csv").getPath).toDF()
var url = ""
val baseUrl = "http://localhost:8080/countries/search?"
val nationalityDF = inputDF.select("Nationality").distinct.rdd.zipWithIndex()
.asDF("nationality", "index")
val callRestApi: (nationality, idx)=> String = {
val url = s"""${baseUrl}page=${idx}&nameList=${nationality.mkString(",")}"""
println("url:: " + url)
null
}
nationalityDF.withColumn("placeHolder", callRestApi($"nationality", $"index")).drop("placeHolder")