Spark: можно ли увеличить буфер пиарроу? - PullRequest
0 голосов
/ 07 ноября 2019

Я пытаюсь передать большой (~ 30 ГБ) фрейм данных в pandas_udf в spark следующим образом:

@f.pandas_udf(gen_udf_schema(), f.PandasUDFType.GROUPED_MAP)
def _my_udf(df):
    # ... do df work ...
    return df

df = df.groupBy('some_col').apply(_my_udf)

Я попытался увеличить память исполнителя, память драйвера и драйвер maxResultSize, но все еще получаюошибка памяти pyarrow подробно описана ниже в моем кластере. Есть ли эквивалент драйвера maxResultSize, т.е. executor maxResultSize, который я могу использовать, чтобы избежать этой ошибки? Там, кажется, не так много информации об этом онлайн.

Я не могу разделить фрейм данных, потому что на самом деле это объединение 1 маленького фрейма данных и 1 большого фрейма данных размером ~ 29 ГБ. Внутри моего udf я разделяю их и выполняю свою работу, а затем возвращаю только маленький кадр данных.

y4j.protocol.Py4JJavaError: An error occurred while calling o324.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 19.0 failed 4 times, most recent failure: Lost task 44.3 in stage 19.0 (TID 368, ip-172-31-13-57.us-west-2.compute.internal, executor 3): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
    at org.apache.arrow.vector.BaseVariableWidthVector.reallocBufferHelper(BaseVariableWidthVector.java:547)
    at org.apache.arrow.vector.BaseVariableWidthVector.reallocValidityAndOffsetBuffers(BaseVariableWidthVector.java:529)
    at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1221)
    at org.apache.arrow.vector.BaseVariableWidthVector.fillEmpties(BaseVariableWidthVector.java:881)
    at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1062)
    at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:242)
    at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:121)
    at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:86)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:85)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:96)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
...