Написание файла разделенного паркета с помощью SparkR - PullRequest
2 голосов
/ 13 июня 2019

У меня есть два сценария, один на R и короткий второй на pyspark, который использует вывод.Я пытаюсь скопировать эту функциональность в первый скрипт для простоты.

Второй скрипт очень прост - прочитайте кучу CSV-файлов и сгенерируйте их как разделенный паркет:

spark.read.csv(path_to_csv, header = True) \
     .repartition(partition_column).write \
     .partitionBy(partition_column).mode('overwrite') \
     .parquet(path_to_parquet)

Это должно быть одинаково просто в R, но я не могу понять, как соответствовать функциональности partitionBy в SparkR.Я получил это до сих пор:

library(SparkR); library(magrittr)
read.df(path_to_csv, 'csv', header = TRUE) %>%
  repartition(col = .$partition_column) %>%
  write.df(path_to_parquet, 'parquet', mode = 'overwrite')

Это успешно записывает один файл паркета для каждого значения partition_column.Проблема в том, что испускаемые файлы имеют неправильную структуру каталогов;тогда как Python производит что-то вроде

/path/to/parquet/
  partition_column=key1/
    file.parquet.gz
  partition_column=key2/
    file.parquet.gz
  ...

R производит только

/path/to/parquet/
  file_for_key1.parquet.gz
  file_for_key2.parquet.gz
  ...

Я что-то упустил?функция partitionBy в SparkR появляется только для ссылки на контекст оконных функций, и я не вижу в руководстве ничего другого, что могло бы быть связано.Возможно, есть способ передать что-то в ..., но я не вижу примеров в документации или поиске в Интернете.

1 Ответ

1 голос
/ 13 июня 2019

Разделение вывода не поддерживается в Spark <= 2.x. </p>

Однако он будет поддерживаться в SparR> = 3.0.0 ( SPARK-21291 - R partitionBy API ) со следующим синтаксисом:

write.df(
  df, path_to_csv, "parquet", mode = "overwrite",
  partitionBy = "partition_column"
)

Поскольку соответствующий PR изменяет только R-файлы, вы должны иметь возможность исправлять любой дистрибутив SparkR 2.x, если обновление до версии для разработки не является возможным:

git clone https://github.com/apache/spark.git
git checkout v2.4.3  # Or whatever branch you use
# https://github.com/apache/spark/commit/cb77a6689137916e64bc5692b0c942e86ca1a0ea
git cherry-pick cb77a6689137916e64bc5692b0c942e86ca1a0ea
R -e "devtools::install('R/pkg')"

В режиме клиента это должно требоваться только на узле драйвера.

но это не смертельно и не должно вызывать серьезных проблем.

...