Spark распараллелить с функциональностью столбца - PullRequest
0 голосов
/ 19 сентября 2018

Цель

Если withColumn еще не работает параллельно, мне нужно распараллелить функциональность withColumn, чтобы преобразовать значения в (большом) столбце DataFrame намного быстрее.

Фон

Я использую PySpark.

У меня есть файл format.json, который содержит определенные операции поиска по регулярному выражению и инструкции по преобразованию операций.

Вот пример:

{
  "currency": {
    "dtype": "float64",
    "searches": [
      {
        "regex_search": "^[\\s]*\\$[+-]?[0-9]{1,3}(?:,?[0-9]{3})*(?:\\.[0-9]{2})?[\\s]*$",
        "op": {
          "replace": {
            "regex": "[$,]",
            "sub": ""
          }
        }
      },
      {
        "regex_search": "^[\\s]*See Award Doc[\\s]*$",
        "op": {
          "replace": {
            "regex": "^.*$",
            "sub": ""
          }
        }
      }
    ]
  }
}

У меня есть UDF, который по сути просматривает столбец и пытается сопоставить и преобразовать данные.

# transform data so that column can be cast properly
udf_match_and_transform = f.udf(lambda data: match_and_transform(data, format_dict), StringType())
df = df.withColumn(column_name, udf_match_and_transform(df[column_name]))

Функция match_and_transform, по сути, просматривает словарь форматирования и пытается сопоставитьданное значение в формат и затем возвращает преобразование этого значения.

В последней строке используется withColumn, чтобы по существу отобразить UDF в столбец, заменяя старый столбец вновь преобразованным столбцом.Проблема в том, что, кажется, это занимает очень много времени по сравнению с другими операциями, которые я выполняю над столбцами, такими как частота, процентиль, среднее, минимальное, максимальное, стандартное отклонение и т. Д.

TLDR

Itпохоже, что функция withColumn в DataFrame не использует несколько ядер;как получить аналогичную функциональность, но с распараллеливанием?

Примечания

Я тестирую это локально на машине с 12 ядрами и 16 ГБ оперативной памяти.

Я знаю, что могупреобразовать DataFrame в RDD и распараллелить, но я бы хотел этого избежать, если есть другой способ.

Я также знаю, что было бы быстрее использовать Scala / Java UDF, но я думаю, что это все равновыполняться с одним ядром, используя withColumn?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...