Spark Невозможно оценить выражение: отставание выражения окна - PullRequest
0 голосов
/ 01 октября 2018

Я пытаюсь выполнить большое количество операций над кадром данных из таблицы cassandra, а затем сохранить его в другой таблице.Одна из этих операций выглядит следующим образом:

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))

Когда я выполняю свою работу, я получаю исключение о том, что операция lag не может быть оценена ..

2018-10-08 12:02:22 INFO  Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
    2018-10-08 12:02:22 INFO  CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
            at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
            at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:202)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:201)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.immutable.List.foreach(List.scala:381)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.immutable.List.map(List.scala:285)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.AbstractTraversable.map(Traversable.scala:104)
            at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
            at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
            at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
            at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
            at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
            at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
            at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
            at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
            at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
            at com.test.TestFromCassandra.main(TestFromCassandra.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    2018-10-08 12:02:31 INFO  CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster

Строка номер 130 файла TestFromCassandra является вызовом функции save().Я не нашел подобной проблемы в Stackoverflow ..

Кто-нибудь знает, почему я столкнулся с этим исключением?Есть ли у функции lag какие-либо ограничения с функцией прокрутки sum?

РЕДАКТИРОВАТЬ: Я нашел аналогичная проблема на Джира Спарк.Кажется, есть ошибка в функции filter после ссылки на функцию window, и так как соединитель cassandra фильтрует кадры данных на элементах первичного ключа (используя функцию isnotnull) перед сохранением, это то, чтоможет вызвать исключение.Есть ли способ выполнить эту операцию, избегая этой ошибки, но без использования функции агрегирования?Или кто-то знает, как исправить эту ошибку?

EDIT 2: Я также пытался сохранить свой фрейм данных, используя записывающее устройство foreach и функцию соединителя withSessionDo, но все равно получаютакое же исключение .. Никто никогда не сталкивался с этой проблемой?

РЕДАКТИРОВАТЬ 3: Я нашел другой способ добиться желаемой операции:

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))

Проблемане из-за фильтра.Кажется, что просто невозможно использовать функцию задержки в выражении окна.

Ответы [ 2 ]

0 голосов
/ 09 октября 2018

Я столкнулся с той же проблемой, а затем заметил, что вы используете функцию over over lag (как у меня).Я изменил что-то вроде этого:

df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))

0 голосов
/ 04 октября 2018

Я вижу ту же ошибку.Несмотря на то, что есть обходной путь для этой проблемы, искра должна исправить это.Я считаю, что вы решите эту проблему с помощью любой оконной функции, а не только LAG.Я считаю, что причина в том, что spark пытается выполнить генерацию кода на фильтре, но оконные функции не поддерживают кодирование.Обходным путем будет создание столбца с этим выражением окна и использование этого столбца в фильтре.

...