Я хочу создать пустой фрейм данных из существующего фрейма данных spark.Я использую поддержку Pyarrow (включен в конф конф).Когда я пытаюсь создать пустой фрейм данных из пустого RDD и той же схемы, что и мой существующий фрейм данных, я получаю исключение java.lang.NegativeArraySizeException.Вот весь код для воспроизведения ошибки
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.enabled", "true") \
.getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()
А вот полный след стека:
/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)
warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
2120 _check_dataframe_localize_timestamps
2121 import pyarrow
-> 2122 batches = self._collectAsArrow()
2123 if len(batches) > 0:
2124 table = pyarrow.Table.from_batches(batches)
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
2182 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
2183 finally:
-> 2184 jsocket_auth_server.getResult() # Join serving thread and raise any exceptions
2185
2186 ##########################################################################################
/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Ошибка исчезает, когда я отключаю pyarrow с помощью
spark.conf.set("spark.sql.execution.arrow.enabled","false")
Это известная проблема с pyspark или связанная с pyarrow?
Примечание: эту ошибку можно воспроизвести только с pyspark> = 2.4.4.