Суммирующие массивы, сгруппированные по ключу в искре (scala) - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть следующий DataFrame:

[info] root
[info]  |-- id: integer (nullable = true)
[info]  |-- vec: array (nullable = true)
[info]  |    |-- element: double (containsNull = false)
[info] +----+--------------------+
[info] |  id|                 vec|
[info] +----+--------------------+
[info] |  59|[-0.17827, 0.4178...|
[info] |  59|[-0.17827, 0.4178...|
[info] |  79|[-0.17827, 0.4178...|
[info] | 280|[-0.17827, 0.4178...|
[info] | 385|[-0.17827, 0.4178...|
[info] | 419|[-0.17827, 0.4178...|
[info] | 514|[-0.17827, 0.4178...|
[info] | 757|[-0.17827, 0.4178...|
[info] | 787|[-0.17827, 0.4178...|
[info] |1157|[-0.17827, 0.4178...|
[info] |1157|[-0.17827, 0.4178...|
[info] |1400|[-0.17827, 0.4178...|
[info] |1632|[-0.17827, 0.4178...|
[info] |1637|[-0.17827, 0.4178...|
[info] |1639|[-0.17827, 0.4178...|
[info] |1747|[-0.17827, 0.4178...|
[info] |1869|[-0.17827, 0.4178...|
[info] |1929|[-0.17827, 0.4178...|
[info] |1929|[-0.17827, 0.4178...|
[info] |2059|[-0.17827, 0.4178...|
[info] +----+--------------------+

Я пытаюсь сгруппировать его по идентификатору, а затем найти средний вектор (массив представляет вектор) для каждого идентификатора.Усреднение сейчас казалось немного сложным, так как я даже не мог понять, как суммировать!

Я написал следующий код:

val aggregated = joined_df
     .rdd
     .map{ case Row(k: Int, v: Array[Double]) => (k, v) }
     .reduceByKey((acc,element) => (acc, element).zipped.map(_+_))
     .toDF("id", "vec")

Каждый раз, когда я запускаю его, я получаю следующий стекtrace:

    [error] Driver stacktrace:
[error]     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
[error]     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error]     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
[error]     at scala.Option.foreach(Option.scala:257)
[error]     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
[error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
[error]     at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
[error]     at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
[error]     at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
[error]     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
[error]     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
[error]     at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
[error]     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
[error]     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
[error]     at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
[error]     at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
[error]     at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
[error]     at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
[error]     at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
[error]     at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
[error]     at Main$.main(Main.scala:101)
[error]     at Main.main(Main.scala)
[error] Caused by: scala.MatchError: [59,WrappedArray(-0.17827, 0.41787, 0.043321, -0.2759, 0.71027, -0.17696, 0.1804, -0.14864, -0.37659, 0.29254, -0.40274, 0.59584, -0.95916, -0.15567, 0.76168, 0.088067, 0.6846, -0.39884, 0.01839, -0.025578, -0.67058, 0.51273, 0.78468, -0.12751, 0.46849, -1.3988, -0.73757, -0.11943, 1.5621, -0.66478, 3.3061, 0.48236, -0.73916, -0.2679, -0.47081, -0.18434, 0.36776, -0.51161, 0.060674, -0.087342, -0.20121, -0.53426, 0.45001, -0.015149, -0.070133, 0.35922, -0.25262, 0.18598, 0.12959, 0.87333)] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
[error]     at Main$$anonfun$2.apply(Main.scala:97)
[error]     at Main$$anonfun$2.apply(Main.scala:97)
[error]     at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
[error]     at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
[error]     at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
[error]     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
[error]     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:109)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
[error]     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[error]     at java.base/java.lang.Thread.run(Thread.java:844)

Я впервые использую Spark, но я достиг своего предела поиска в Google.У меня кончились подсказки.По всем моим поискам это должно было сработать.

1 Ответ

0 голосов
/ 29 ноября 2018

Я думаю, что это может быть проблемой.Количество элементов в вашем векторе должно совпадать с предложением match.

val vec1 = Vector(1,2,3)

vec1 match {
  case Vector(a, b, c) => println("vector matched")
}

vec1 match {
  case Vector(a, b) => println("vector matched")
}

В приведенном выше примере первый будет успешным, но позже потерпит неудачу.

scala.MatchError: [59,WrappedArray(-0 может быть подсказкой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...