Как сделать членские операции над массивом в spark sql? - PullRequest
0 голосов
/ 16 октября 2018

В spark-sql у меня есть фрейм данных со столбцом col, который содержит массив Int размера 100 (например).

Я хочу объединить этот столбец в одно значение, которое является массивомиз Int размера 100, который содержит сумму каждого элемента столбца.Это можно сделать, вызвав:

dataframe.agg(functions.array((0 until 100).map(i => functions.sum(i)) : _*))

Это сгенерирует код для явного выполнения 100 агрегаций, а затем представит 100 результатов в виде массива из 100 элементов.Однако это кажется очень неэффективным, так как катализатор даже не сможет сгенерировать код для этого, если размер моего массива превышает ~ 1000 элементов.Есть ли в spark-sql конструкция, чтобы сделать это более эффективно?В идеале должна быть возможность автоматически распространять агрегацию sum по массиву, чтобы получить членскую сумму, но я не нашел ничего связанного с этим в документе.Какие есть альтернативы моему коду?

edit: мой traceback:

   ERROR codegen.CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
    at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1002)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1069)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1066)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:948)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:375)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1.apply(HashAggregateExec.scala:97)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1.apply(HashAggregateExec.scala:92)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1.apply(HashAggregateExec.scala:97)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1.apply(HashAggregateExec.scala:92)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:597)
    at com.criteo.enterprise.eligibility_metrics.RankingMetricsComputer$.runAndSaveMetrics(RankingMetricsComputer.scala:286)
    at com.criteo.enterprise.eligibility_metrics.RankingMetricsComputer$.main(RankingMetricsComputer.scala:366)
    at com.criteo.enterprise.eligibility_metrics.RankingMetricsComputer.main(RankingMetricsComputer.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)

1 Ответ

0 голосов
/ 16 октября 2018

Лучший способ сделать это - преобразовать вложенные массивы в их собственные строки, чтобы вы могли использовать один groupBy.Таким образом, вы можете сделать все это в одной агрегации вместо 100 (или более).Ключом к этому является использование posexplode, которое превратит каждую запись в массиве в новую строку с индексом, в котором она была расположена в массиве.

Например:

import org.apache.spark.sql.functions.{posexplode, collect_list}

val data = Seq(
    (Seq(1, 2, 3, 4, 5)),
    (Seq(2, 3, 4, 5, 6)),
    (Seq(3, 4, 5, 6, 7))
)

val df = data.toDF

val df2 = df.
    select(posexplode($"value")).
    groupBy($"pos").
    agg(sum($"col") as "sum")

// At this point you will have rows with the index and the sum
df2.orderBy($"pos".asc).show

Выводит DataFrame следующим образом:

+---+---+
|pos|sum|
+---+---+
|  0|  6|
|  1|  9|
|  2| 12|
|  3| 15|
|  4| 18|
+---+---+

Или, если вы хотите, чтобы они были в одной строке, вы могли бы объявить что-то вроде этого:

df2.groupBy().agg(collect_list(struct($"pos", $"sum")) as "list").show

Значения в столбце Array не будутне может быть отсортировано, но вы можете написать UDF для сортировки по полю pos и удалить поле pos, если хотите это сделать.

Обновлено за комментарий

Если описанный выше подход не работает с какими-либо другими агрегатами, которые вы пытаетесь сделать, вам нужно будет определить свой собственный UDAF.Общая идея здесь заключается в том, что вы говорите Spark, как объединять значения для одного и того же ключа внутри раздела для создания промежуточных значений, а затем как объединять эти промежуточные значения между разделами для создания окончательного значения для каждого ключа.Определив класс UDAF, вы можете использовать его в вызове aggs с любыми другими агрегатами, которые вы хотели бы сделать.

Вот быстрый пример, который я выбил.Обратите внимание, что он принимает длину массива, и, вероятно, его следует сделать более защищенным от ошибок, но он поможет вам в этом.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction


class ArrayCombine extends UserDefinedAggregateFunction {
  // The input this aggregation will receive (each row)
  override def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", ArrayType(IntegerType)) :: Nil)

  // Your intermediate state as you are updating with data from each row
  override def bufferSchema: StructType = StructType(
    StructType(StructField("value", ArrayType(IntegerType)) :: Nil)
  )

  // This is the output type of your aggregatation function.
  override def dataType: DataType = ArrayType(IntegerType)

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = (0 until 100).toArray
  }

  // Given a new input row, update our state
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val sums = buffer.getSeq[Int](0)
    val newVals = input.getSeq[Int](0)

    buffer(0) = sums.zip(newVals).map { case (a, b) => a + b }
  }

  // After we have finished computing intermediate values for each partition, combine the partitions
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val sums1 = buffer1.getSeq[Int](0)
    val sums2 = buffer2.getSeq[Int](0)

    buffer1(0) = sums1.zip(sums2).map { case (a, b) => a + b }
  }

  // This is where you output the final value, given the final value of your bufferSchema.
  override def evaluate(buffer: Row): Any = {
    buffer.getSeq[Int](0)
  }
}

Затем вызовите его так:

val arrayUdaf = new ArrayCombine()
df.groupBy().agg(arrayUdaf($"value")).show
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...