Spark Streaming groupBy / apply panda UDF не поддерживает "spark. sql .execution.arrow.enabled" - PullRequest
0 голосов
/ 02 марта 2020

Я пытаюсь применить pandas GROUPED_MAP udf, как описано здесь https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark. sql .functions.pandas_udf .

    .withWatermark("kafka_ts", "3 minutes") \
    .groupBy(
        window("kafka_ts", "3 minutes"),
        "grouping_key"
    ) \
    .apply(my_udf_func) \

Это не выполняется во время выполнения с ошибка:

ERROR   2020-03-02 06:42:27,241 36498   org.apache.spark.util.Utils [Executor task launch worker for task 171]  Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 304, in load_stream
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 304, in <listcomp>
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in arrow_to_pandas
    s = _check_series_convert_date(s, from_arrow_type(arrow_column.type))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1672, in from_arrow_type
    raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
TypeError: Unsupported type in conversion from Arrow: struct<start: timestamp[us, tz=Etc/UTC], end: timestamp[us, tz=Etc/UTC]>
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Похоже, ошибка произошла от объекта окна struct<start: timestamp[us, tz=Etc/UTC], end: timestamp[us, tz=Etc/UTC]>, переданного в стрелку для преобразования, поскольку она не поддерживается. Но отключение стрелки, как показано ниже, похоже, не отключает преобразование на основе стрелки.

spark = SparkSession.builder \
    .master('local[*]') \
    .config('spark.executor.memory', '2g') \
    .config('spark.driver.memory','8g') \
    .config('spark.sql.execution.arrow.enabled', False) \
    .getOrCreate()

# nor this
spark.conf.set("spark.sql.execution.arrow.enabled", False)

Я использую среду ноутбука Jupyter. Кто-нибудь понимает, что здесь происходит?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...