Spark отказывается создавать пустой фрейм данных при использовании pyarrow - PullRequest
1 голос
/ 19 сентября 2019

Я хочу создать пустой фрейм данных из существующего фрейма данных spark.Я использую поддержку Pyarrow (включен в конф конф).Когда я пытаюсь создать пустой фрейм данных из пустого RDD и той же схемы, что и мой существующий фрейм данных, я получаю исключение java.lang.NegativeArraySizeException.Вот весь код для воспроизведения ошибки

spark = SparkSession.builder \
                    .config("spark.sql.execution.arrow.enabled", "true") \
                    .getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()

А вот полный след стека:

/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
    at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
    at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)

  warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   2120                         _check_dataframe_localize_timestamps
   2121                     import pyarrow
-> 2122                     batches = self._collectAsArrow()
   2123                     if len(batches) > 0:
   2124                         table = pyarrow.Table.from_batches(batches)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
   2182                 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
   2183             finally:
-> 2184                 jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
   2185
   2186     ##########################################################################################

/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Ошибка исчезает, когда я отключаю pyarrow с помощью

spark.conf.set("spark.sql.execution.arrow.enabled","false")

Это известная проблема с pyspark или связанная с pyarrow?

Примечание: эту ошибку можно воспроизвести только с pyspark> = 2.4.4.

1 Ответ

0 голосов
/ 22 сентября 2019

решение проблемы для сбора RDD и создания кадра данных pandas из результата, как показано ниже: другая проблема в вашем коде использовала ':' для замены на ','

from pyspark.sql import SparkSession
import pyarrow as pa
import pandas as pd


spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")

empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
empty_pandas_df = empty_df.collect()
empty_pandas_df = pd.DataFrame(empty_pandas_df)

print(empty_pandas_df)
df.show()

output

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Empty DataFrame
Columns: []
Index: []
[Stage 2:>                                                          (0 + 3) / 3]+---+
|age|
                                                                                +---+
| 10|
| 11|
| 13|
+---+
...