Сбой SparkR с массивами в качестве входных данных - PullRequest
2 голосов
/ 21 апреля 2020

Я работаю с dapply (а также dapplyCollect) функциями в SparkR, которые не работают, когда в качестве входных данных используются массивы.

Мои входные данные находятся в SparkDataFrame, считанном из файла партера из уже существующий процесс (я не могу ничего изменить в этом процессе). Этот фрейм данных состоит из многих миллионов строк и нескольких столбцов данных, хранящихся в виде массивов. Вот упрощенный пример, созданный из фрейма данных R.

> R.df <- data.frame(key = 1:3)
> R.df$data <- list(seq(1, 10))
> R.df
  key                          data
1   1 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2   2 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
3   3 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

Тот же фрейм данных, преобразованный в SparkDataFrame:

> Spark.df <- createDataFrame(R.df)
> Spark.df
SparkDataFrame[key:int, data:array<int>]

Далее мы выполняем обработку с пользовательской функцией и ошибкой. брошено:

my.complicated.process <- function(partition) {
  ## Here we would do something to each partition
  ## For this test, return data unchanged
  return(partition)
}

spark.results <- dapply(Spark.df, my.complicated.process, schema(Spark.df))
head(spark.results)

Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 55.0 failed 1 times, most recent failure: Lost task 0.0 in stage 55.0 (TID 55, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
 Error in (function (..., deparse.level = 1, make.row.names = TRUE, stringsAsFactors = default.stringsAsFactors(),  : 
  invalid list argument: all variables should have the same length
Calls: compute -> do.call -> <Anonymous>
Execution halted
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:113)
    at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:58)
    at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
    at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:188)
    at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:185)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInter

Мне удалось обойти эту проблему:

  1. По примеру здесь . Используя serialize и unserialize для преобразования массива в простой двоичный тип. Недостатком этого является необходимость сериализации перед созданием SparkDataFrame.
  2. Путем преобразования массива в строку, разделенную запятыми, например, с помощью to_json, затем с использованием комбинации strsplit, paste и as.integer в R и split, cast в SparkR до go между массивом цифр c и строкой
  3. При использовании explode в SparkR для разделения столбца массива на несколько строк и поворачиваем фрейм данных, чтобы получить столбцы, затем используем dapply над этими новыми столбцами.

Ни одно из этих решений не является оптимальным, поэтому я надеялся найти способ для dapply, чтобы оба брали массив типы в качестве входных данных, а также возвращают типы массивов.

Я использую R v3.6.3 с SparkR v2.4.5.

...