У меня есть таблица, подобная следующей:
+----------+----+--------------+-------------+
| Date|Hour| Weather|Precipitation|
+----------+----+--------------+-------------+
|2013-07-01| 0| null| null|
|2013-07-01| 3| null| null|
|2013-07-01| 6| clear|trace of p...|
|2013-07-01| 9| null| null|
|2013-07-01| 12| null| null|
|2013-07-01| 15| null| null|
|2013-07-01| 18| rain| null|
|2013-07-01| 21| null| null|
|2013-07-02| 0| null| null|
|2013-07-02| 3| null| null|
|2013-07-02| 6| rain|low precip...|
|2013-07-02| 9| null| null|
|2013-07-02| 12| null| null|
|2013-07-02| 15| null| null|
|2013-07-02| 18| null| null|
|2013-07-02| 21| null| null|
+----------+----+--------------+-------------+
Идея состоит в том, чтобы заполнить столбцы Weather
и Precipitation
значениями в 6 и 18 часов и в 6 часов соответственно. Поскольку эта таблица иллюстрирует структуру DataFrame
, простая итерация в этом направлении выглядит иррациональной.
Я пробовал что-то вроде этого:
//_weather stays for the table mentioned
def fillEmptyCells: Unit = {
val hourIndex = _weather.schema.fieldIndex("Hour")
val dateIndex = _weather.schema.fieldIndex("Date")
val weatherIndex = _weather.schema.fieldIndex("Weather")
val precipitationIndex = _weather.schema.fieldIndex("Precipitation")
val days = _weather.select("Date").distinct().rdd
days.foreach(x => {
val day = _weather.where("Date == $x(0)")
val dayValues = day.where("Hour == 6").first()
val weather = dayValues.getString(weatherIndex)
val precipitation = dayValues.getString(precipitationIndex)
day.rdd.map(y => (_(0), _(1), weather, precipitation))
})
}
Однако этот уродливый кусок кода, кажется, пахнет из-за итерации RDD
вместо распределенной обработки. Он также должен формировать новые RDD
или DataFrame
из частей, что может быть проблематично (я понятия не имею, как это сделать). Есть ли более элегантный и простой способ решить эту задачу?