Искра вызвана: java.lang.StackOverflowError Функция окна? - PullRequest
0 голосов
/ 30 мая 2019

Я столкнулся с ошибкой, вызванной оконной функцией.

Когда я применяю этот сценарий и сохраняю только несколько строк образца, он работает нормально, однако, когда я применяю его ко всему набору данных (всего несколько ГБ) он завершается с этой причудливой ошибкой на последнем шаге при попытке сохранения в hdfs ... скрипт работает, когда я сохраняю без оконной функции, поэтому проблема должна быть из-за этого ( У меня около 325 столбцов функций, работающих через цикл for ).

Есть идеи, что может быть причиной проблемы? Моя цель - просто рассчитать данные временных рядов с помощью метода прямой заливки для каждой переменной в моем фрейме данных.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import sys
print(spark.version)
'2.3.0'

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', '1', '10', '0.5', 'FALSE'),\
                            ('2019-05-10 7:30:10', '2', 'UNKNOWN', '0.24', 'FALSE'),\
                            ('2019-05-10 7:30:15', '3', '6', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:20', '4', '7', 'UNKNOWN', 'UNKNOWN'),\
                            ('2019-05-10 7:30:25', '5', '10', '1.1', 'UNKNOWN'),\
                            ('2019-05-10 7:30:30', '6', 'UNKNOWN', '1.1', 'NULL'),\
                            ('2019-05-10 7:30:35', '7', 'UNKNOWN', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:49', '8', '50', 'UNKNOWN', 'UNKNOWN')], ["date", "id", "v1", "v2", "v3"])

df = df.withColumn("date", F.col("date").cast("timestamp"))

# imputer process / all cols that need filled are strings
def stringReplacer(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
  for i in cols:
    window = Window\
    .partitionBy(F.month(partitioner))\
    .orderBy(partitioner)\
    .rowsBetween(-sys.maxsize, 0)

    df = df\
    .withColumn(i, stringReplacer(F.col(i), value))
    fill = F.last(df[i], ignorenulls=True).over(window)
    df = df.withColumn(i,  fill)
  return df
df2 = forwardFillImputer(df, cols=[i for i in df.columns])

# errors here
df2\
.write\
.format("csv")\
.mode("overwrite")\
.option("header", "true")\
.save("test_window_func.csv")
Py4JJavaError: An error occurred while calling o13504.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:381)

возможный рабочий раствор

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)
    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]
    missing_cols = [i for i in df.columns if i not in cols]
    return df.select(missing_cols+imputed_cols)

df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])

df2.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- v3: string (nullable = true)

df2.show()
+-------------------+---+---+----+-----+
|               date| id| v1|  v2|   v3|
+-------------------+---+---+----+-----+
|2019-05-10 07:30:05|  1| 10| 0.5|FALSE|
|2019-05-10 07:30:10|  2| 10|0.24|FALSE|
|2019-05-10 07:30:15|  3|  6|0.24| TRUE|
|2019-05-10 07:30:20|  4|  7|0.24| TRUE|
|2019-05-10 07:30:25|  5| 10| 1.1| TRUE|
|2019-05-10 07:30:30|  6| 10| 1.1| NULL|
|2019-05-10 07:30:35|  7| 10| 1.1| TRUE|
|2019-05-10 07:30:49|  8| 50| 1.1| TRUE|
+-------------------+---+---+----+-----+

1 Ответ

2 голосов
/ 30 мая 2019

По предоставленной трассировке стека, я считаю, что ошибка связана с подготовкой плана выполнения, как говорится:

Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)

Я считаю, что причина этого в том, что вы вызываете метод .withColumn дважды в цикле. То, что .withColumn делает в плане выполнения Spark, - это в основном оператор select для всех столбцов, в которых 1 столбец был изменен, как указано в методе. Если у вас есть 325 столбцов, то для одной итерации это вызовет select на 325 столбцов дважды -> 650 столбцов, переданных в планировщик. Проделав это 325 раз, вы увидите, как это может привести к накладным расходам.

Однако очень интересно, что вы не получите эту ошибку для небольшой выборки, я бы ожидал иначе.

В любом случае вы можете попробовать заменить ваш forwardFillImputer следующим образом:

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)

    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]

    missing_cols = [F.col(i) for i in df.columns if i not in cols]

    return df.select(missing_cols + imputed_cols)

Таким образом, вы в основном просто разбираете в планировщике один оператор выбора, который должен быть проще в обработке.

Как предупреждение, как правило, Spark плохо справляется с большим количеством столбцов, поэтому по пути вы можете увидеть другие странные проблемы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...