Многократные операции PySpark терпят неудачу на кадре данных - PullRequest
1 голос
/ 01 июля 2019

Столбцы feature_1_(double) и feature_2_(double) в приведенном ниже кадре данных (df2) создаются с использованием словарного поиска в функции udf. Звонок

feature_1 = udf(lambda row: func1(row), DoubleType())
feature_2 = udf(lambda row: func2(row), DoubleType())
df1 = df.withColumn("feature_1_(double)", feature_1("domain_ids"))
df2 = df1.withColumn("feature_2_(double)", feature_2("domain_ids")).drop("domain_ids")
df2.show(5,False)

возвращает следующий фрейм данных

+----------+-------+------------------+------------------+----+
|session_id|serp_id|feature_1_(double)|feature_2_(double)|url1|
+----------+-------+------------------+------------------+----+
|         0|      0|               2.0|               0.0|   0|
|         3|      1|               1.0|               0.0|   0|
|         4|      0|               5.0|               0.0|   0|
|         5|      2|               2.0|               0.0|   2|
|         5|      0|               3.0|               0.0|   1|
+----------+-------+------------------+------------------+----+

Который придерживается следующей схемы:

root
 |-- session_id: integer (nullable = true)
 |-- serp_id: integer (nullable = true)
 |-- feature_1_(double): double (nullable = false)
 |-- feature_2_(double): double (nullable = false)
 |-- url1: integer (nullable = true)

Чтобы использовать классификатор нескольких классов для оценки вероятностей классов для url1, я хочу собрать элементы 1 и 2 в виде вектора, используя VectorAssembler. Тем не менее, выполняя

assembler = VectorAssembler(
    inputCols = ["feature_1_(double)", "feature_2_(double)"],
    outputCol = "features")

df3 = assembler.transform(df2)
df3.show(5)

возвращает следующую ошибку. Проверка, содержат ли столбцы в df3 нулевые значения в соответствии с , этот подход приводит к той же ошибке.

Любая помощь приветствуется.

Py4JJavaError: An error occurred while calling o2456.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 89 in stage 533.0 failed 4 times, most recent failure: Lost task 89.3 in stage 533.0 (TID 49754, st446-w-0.europe-west2-b.c.sincere-hearth-236110.internal, executor 23): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 140, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 320, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in <lambda>
    return lambda *a: f(*a)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-102-205c8e3fad14>", line 2, in <lambda>
  File "<ipython-input-101-533a3b67fe1f>", line 45, in relevance
KeyError: 567

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage112.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage113.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 140, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 320, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in <lambda>
    return lambda *a: f(*a)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-102-205c8e3fad14>", line 2, in <lambda>
  File "<ipython-input-101-533a3b67fe1f>", line 45, in relevance
KeyError: 567

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage112.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage113.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...