У меня есть два кадра данных.
countryDF
+-------+-------------------+--------+---------+
| id | CountryName |Latitude|Longitude|
+-------+-------------------+--------+---------+
| 1 | United States | 39.76 | -98.5 |
| 2 | China | 35 | 105 |
| 3 | India | 20 | 77 |
| 4 | Brazil | -10 | -55 |
...
+-------+-------------------+--------+---------+
salesDF
+-------+-------------------+--------+---------+--------+
| id | Country |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+--------+
| 1 | Japan | | | 11 |
| 2 | China | | | 12 |
| 3 | Brazil | | | 56 |
| 4 | Scotland | | | 12 |
...
+-------+-------------------+--------+---------+--------+
Задача - создать широту и долготу для salesDF. Это будет искать каждую ячейку столбца salesDF "Страна" из столбца countryDF "CountryName". Если найдена строка, добавьте к ней соответствующие «Широта» и «Долгота».
Выходной кадр данных:
+-------+-------------------+--------+---------+---------+
| id | CountryName |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+---------+
| 1 | Japan | 35.6 | 139 | 11 |
| 2 | China | 35 | 105 | 12 |
| 3 | Brazil | -10 | -55 | 56 |
| 4 | Scotland | 55.95 | -3.18 | 12 |
...
+-------+-------------------+--------+---------+---------+
Я пишу функцию карты для выполнения операции. Но, похоже, функция map не может получить доступ к внешней переменной dataframe. Любые решения?
val countryDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("Country.csv")
var revenueDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("revenue.csv")
var resultRdd = revenueDF.rdd.map(row => {
val generateRow = (row: Row, latitude: Any, longitude: Any, latidudeIndex: Int, longitudeIndex: Int) => {
val arr = row.toSeq.toArray
arr(latidudeIndex) = latitude
arr(longitudeIndex) = longitude
Row.fromSeq(arr)
}
val countryName = row.getAs[String](1)
// cannot access countryDF, it is corrupted
val countryRow = countryDF.where(col("CountryName") === countryName)
generateRow(row, row.getAs[String](2), row.getAs[String](3),2, 3)
})
revenueDF.sqlContext.createDataFrame(resultRdd, revenueDF.schema).show()