Установите значение столбца в зависимости от предыдущих с помощью Spark, не повторяя атрибут группировки - PullRequest
2 голосов
/ 06 ноября 2019

Учитывая DataFrame:

+------------+---------+
|variableName|dataValue|
+------------+---------+
|       IDKey|       I1|
|           b|        y|
|           a|        x|
|       IDKey|       I2|
|           a|        z|
|           b|        w|
|           c|        q|
+------------+---------+

Я хочу создать новый столбец с соответствующими значениями IDKey, где каждое значение изменяется при каждом изменении значения dataValue для IDKey, вот ожидаемый результат:

+------------+---------+----------+
|variableName|dataValue|idkeyValue|
+------------+---------+----------+
|       IDKey|       I1|        I1|
|           b|        y|        I1|
|           a|        x|        I1|
|       IDKey|       I2|        I2|
|           a|        z|        I2|
|           b|        w|        I2|
|           c|        q|        I2|
+------------+---------+----------+

Я попытался сделать следующий код, который использует mapPartitions() и глобальную переменную

var currentVarValue = ""
frame
  .mapPartitions{ partition =>
    partition.map { row =>
      val (varName, dataValue) = (row.getString(0), row.getString(1))

      val idKeyValue = if (currentVarValue != dataValue && varName == "IDKey") {
        currentVarValue = dataValue
        dataValue
      } else {
        currentVarValue
      }

      ExtendedData(varName, dataValue, currentVarValue)
    }
  }

Но это не сработает из-за двух фундаментальных вещей: Spark не обрабатывает глобальные переменные а также, это не соответствует функциональному стилю программирования

Я буду рад любой помощи по этому вопросу Спасибо!

1 Ответ

0 голосов
/ 06 ноября 2019

Вы не можете решить это элегантно и с высокой производительностью способом Spark, поскольку для Spark недостаточно исходной информации для обработки всех данных, которые гарантированно находятся в одном разделе. Если мы выполняем всю обработку в одном и том же разделе, то это не является истинным намерением Spark.

Фактически не может быть создано разумное разбиениеBy (через функцию Window). Проблема здесь заключается в том, что данные представляют собой длинный список последовательных таких данных , для которых потребуется поиск по разделам, если данные в предыдущем разделе относятся к текущему разделу. Это можно сделать, но это настоящая работа. Нулевой 323 имеет ответ где-то здесь, который пытается решить эту проблему, но если я правильно помню, это громоздко.

Логика сделать это достаточно проста, но использование Spark для этого проблематично.

Без данных partitionBy все перетасовываются в один раздел и могут привести к проблемам с OOM и пространством.

Извините.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...