Заполните пустые ячейки дубликатами в DataFrame - PullRequest
0 голосов
/ 10 января 2019

У меня есть таблица, подобная следующей:

    +----------+----+--------------+-------------+
    |      Date|Hour|       Weather|Precipitation|
    +----------+----+--------------+-------------+
    |2013-07-01|   0|          null|         null|
    |2013-07-01|   3|          null|         null|
    |2013-07-01|   6|         clear|trace of p...|
    |2013-07-01|   9|          null|         null|
    |2013-07-01|  12|          null|         null|
    |2013-07-01|  15|          null|         null|
    |2013-07-01|  18|          rain|         null|
    |2013-07-01|  21|          null|         null|
    |2013-07-02|   0|          null|         null|
    |2013-07-02|   3|          null|         null|
    |2013-07-02|   6|          rain|low precip...|
    |2013-07-02|   9|          null|         null|
    |2013-07-02|  12|          null|         null|
    |2013-07-02|  15|          null|         null|
    |2013-07-02|  18|          null|         null|
    |2013-07-02|  21|          null|         null|
    +----------+----+--------------+-------------+

Идея состоит в том, чтобы заполнить столбцы Weather и Precipitation значениями в 6 и 18 часов и в 6 часов соответственно. Поскольку эта таблица иллюстрирует структуру DataFrame, простая итерация в этом направлении выглядит иррациональной. Я пробовал что-то вроде этого:

//_weather stays for the table mentioned
def fillEmptyCells: Unit = {
    val hourIndex = _weather.schema.fieldIndex("Hour")
    val dateIndex = _weather.schema.fieldIndex("Date")
    val weatherIndex = _weather.schema.fieldIndex("Weather")
    val precipitationIndex = _weather.schema.fieldIndex("Precipitation")

    val days = _weather.select("Date").distinct().rdd
    days.foreach(x => {
      val day = _weather.where("Date == $x(0)")
      val dayValues = day.where("Hour == 6").first()
      val weather = dayValues.getString(weatherIndex)
      val precipitation = dayValues.getString(precipitationIndex)
      day.rdd.map(y => (_(0), _(1), weather, precipitation))
    })
  }

Однако этот уродливый кусок кода, кажется, пахнет из-за итерации RDD вместо распределенной обработки. Он также должен формировать новые RDD или DataFrame из частей, что может быть проблематично (я понятия не имею, как это сделать). Есть ли более элегантный и простой способ решить эту задачу?

1 Ответ

0 голосов
/ 10 января 2019

Предполагая, что вы можете легко создать столбец timestamp, комбинируя Date и Hour, я бы сделал следующее:

  1. преобразовать это timestamp (возможно, в миллисекундах или секундах) в hourTimestamp: .withColumn("hourTimestamp", $"timestamp" // 3600)?
  2. создать 3 столбца, соответствующих различным возможным часовым лагам (3,6,9)
  3. coalesce эти 3 столбца + исходный

Вот код для Weather (сделайте то же самое для Precipitation):

val window = org.apache.spark.sql.expressions.Window.orderBy("hourTimestamp")
val weatherUpdate = df
                    .withColumn("WeatherLag1", lag("Weather", 3).over(window))
                    .withColumn("WeatherLag2", lag("Weather", 6).over(window))
                    .withColumn("WeatherLag3", lag("Weather", 9).over(window))
                    .withColumn("Weather",coalesce($"Weather",$"WeatherLag1",$"WeatherLag2",$"WeatherLag3"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...