Как получить доступ к внешнему фрейму данных в функции карты rdd? - PullRequest
0 голосов
/ 13 марта 2019

У меня есть два кадра данных.

countryDF

+-------+-------------------+--------+---------+
|   id  |    CountryName    |Latitude|Longitude|
+-------+-------------------+--------+---------+
|  1    | United States     |  39.76 |   -98.5 |
|  2    | China             |  35    |   105   |
|  3    | India             |  20    |   77    |
|  4    | Brazil            |  -10   |   -55   |
...
+-------+-------------------+--------+---------+

salesDF

+-------+-------------------+--------+---------+--------+
|   id  |    Country        |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+--------+
|  1    | Japan             |        |         |   11   |
|  2    | China             |        |         |   12   |
|  3    | Brazil            |        |         |   56   |
|  4    | Scotland          |        |         |   12   |
...
+-------+-------------------+--------+---------+--------+

Задача - создать широту и долготу для salesDF. Это будет искать каждую ячейку столбца salesDF "Страна" из столбца countryDF "CountryName". Если найдена строка, добавьте к ней соответствующие «Широта» и «Долгота».

Выходной кадр данных:

+-------+-------------------+--------+---------+---------+
|   id  |    CountryName    |Latitude|Longitude|revenue  |
+-------+-------------------+--------+---------+---------+
|  1    | Japan             |  35.6  |   139   | 11      |
|  2    | China             |  35    |   105   | 12      |
|  3    | Brazil            |  -10   |   -55   | 56      |
|  4    | Scotland          |  55.95 |  -3.18  | 12      |
...
+-------+-------------------+--------+---------+---------+

Я пишу функцию карты для выполнения операции. Но, похоже, функция map не может получить доступ к внешней переменной dataframe. Любые решения?

val countryDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("Country.csv")

var revenueDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("revenue.csv")

var resultRdd = revenueDF.rdd.map(row => {
  val generateRow = (row: Row, latitude: Any, longitude: Any, latidudeIndex: Int, longitudeIndex: Int) => {
    val arr = row.toSeq.toArray
    arr(latidudeIndex) = latitude
    arr(longitudeIndex) = longitude
    Row.fromSeq(arr)
  }
  val countryName = row.getAs[String](1)
  // cannot access countryDF, it is corrupted
  val countryRow = countryDF.where(col("CountryName") === countryName)
  generateRow(row, row.getAs[String](2), row.getAs[String](3),2, 3)

})
revenueDF.sqlContext.createDataFrame(resultRdd, revenueDF.schema).show()

1 Ответ

0 голосов
/ 13 марта 2019

Операция, которую вы ищете: присоединение

salesDF.select("id", "Country").join(
  countryDF.select("CountryName", "Latitude", "Longitude")
  $"CountryName" === $"Country",
  "left"
).drop("Country")

И нет, вы не можете использовать DataFrames, RDD и другие распределенные объекты в map, udf или эквивалентном.

...