Как обрабатывать данные Dataframe параллельно, чтобы вызвать url с большим количеством параметров - PullRequest
0 голосов
/ 09 апреля 2019

Я хочу прочитать файл .csv, в котором есть информация об игроках.Я должен получить страну из этого CSV и добавить его к URL для дальнейшего процесса.

Сначала я загружаю данные .csv в фрейм данных.затем я делаю цикл на нем, чтобы добавить национальность к URL-адресу, как показано ниже:

    val inputDF = spark.read.format("csv").option("header", true).option("inferSchema", true).load(getClass.getResource("/FifaData.csv").getPath).toDF()
    var url = ""
    val baseUrl = "http://localhost:8080/countries/search?"

    val nationalityDF = inputDF.select("Nationality").distinct.rdd.zipWithIndex()
    nationalityDF.foreach { case (nationality, idx) =>
        val url = s"${baseUrl}page=${idx}&nameList=${nationality.get(0)}"
        println("url:: " + url)
    }

Интересно, могу ли я избежать обработки данных для каждого пользователя и вызвать ссылку без для каждого?

1 Ответ

2 голосов
/ 10 апреля 2019

Ваша реализация уже распараллелена, так что ура!

Чтобы добавить больше деталей: foreach в искре - это action, который используется для выполнения некоторых операций с побочными эффектами. Он работает на RDD в JVM исполнителя, если в режиме кластера работает spark.

Если вы хотите избавиться от foreach все вместе, тогда вы можете перевести его в UDF и вызвать его. Тем не менее, это не очень хорошая практика, потому что, основываясь на вашем примере, вы не надеетесь получить какой-либо результат от REST API. Осторожно: впереди уродство

import org.apache.spark.sql.functions.udf
val inputDF = spark.read.format("csv").option("header", true).option("inferSchema", true).load(getClass.getResource("/FifaData.csv").getPath).toDF()
var url = ""
val baseUrl = "http://localhost:8080/countries/search?"

val nationalityDF = inputDF.select("Nationality").distinct.rdd.zipWithIndex()
                           .asDF("nationality", "index")

val callRestApi: (nationality, idx)=> String = {
    val url = s"""${baseUrl}page=${idx}&nameList=${nationality.mkString(",")}"""
    println("url:: " + url)
    null
}

nationalityDF.withColumn("placeHolder", callRestApi($"nationality", $"index")).drop("placeHolder")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...