Я работаю с dapply
(а также dapplyCollect
) функциями в SparkR, которые не работают, когда в качестве входных данных используются массивы.
Мои входные данные находятся в SparkDataFrame, считанном из файла партера из уже существующий процесс (я не могу ничего изменить в этом процессе). Этот фрейм данных состоит из многих миллионов строк и нескольких столбцов данных, хранящихся в виде массивов. Вот упрощенный пример, созданный из фрейма данных R.
> R.df <- data.frame(key = 1:3)
> R.df$data <- list(seq(1, 10))
> R.df
key data
1 1 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2 2 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
3 3 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
Тот же фрейм данных, преобразованный в SparkDataFrame:
> Spark.df <- createDataFrame(R.df)
> Spark.df
SparkDataFrame[key:int, data:array<int>]
Далее мы выполняем обработку с пользовательской функцией и ошибкой. брошено:
my.complicated.process <- function(partition) {
## Here we would do something to each partition
## For this test, return data unchanged
return(partition)
}
spark.results <- dapply(Spark.df, my.complicated.process, schema(Spark.df))
head(spark.results)
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 55.0 failed 1 times, most recent failure: Lost task 0.0 in stage 55.0 (TID 55, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
Error in (function (..., deparse.level = 1, make.row.names = TRUE, stringsAsFactors = default.stringsAsFactors(), :
invalid list argument: all variables should have the same length
Calls: compute -> do.call -> <Anonymous>
Execution halted
at org.apache.spark.api.r.RRunner.compute(RRunner.scala:113)
at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:58)
at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:188)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:185)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInter
Мне удалось обойти эту проблему:
- По примеру здесь . Используя
serialize
и unserialize
для преобразования массива в простой двоичный тип. Недостатком этого является необходимость сериализации перед созданием SparkDataFrame. - Путем преобразования массива в строку, разделенную запятыми, например, с помощью
to_json
, затем с использованием комбинации strsplit
, paste
и as.integer
в R и split
, cast
в SparkR до go между массивом цифр c и строкой - При использовании
explode
в SparkR для разделения столбца массива на несколько строк и поворачиваем фрейм данных, чтобы получить столбцы, затем используем dapply
над этими новыми столбцами.
Ни одно из этих решений не является оптимальным, поэтому я надеялся найти способ для dapply
, чтобы оба брали массив типы в качестве входных данных, а также возвращают типы массивов.
Я использую R v3.6.3 с SparkR v2.4.5.