Столбцы feature_1_(double)
и feature_2_(double)
в приведенном ниже кадре данных (df2
) создаются с использованием словарного поиска в функции udf
. Звонок
feature_1 = udf(lambda row: func1(row), DoubleType())
feature_2 = udf(lambda row: func2(row), DoubleType())
df1 = df.withColumn("feature_1_(double)", feature_1("domain_ids"))
df2 = df1.withColumn("feature_2_(double)", feature_2("domain_ids")).drop("domain_ids")
df2.show(5,False)
возвращает следующий фрейм данных
+----------+-------+------------------+------------------+----+
|session_id|serp_id|feature_1_(double)|feature_2_(double)|url1|
+----------+-------+------------------+------------------+----+
| 0| 0| 2.0| 0.0| 0|
| 3| 1| 1.0| 0.0| 0|
| 4| 0| 5.0| 0.0| 0|
| 5| 2| 2.0| 0.0| 2|
| 5| 0| 3.0| 0.0| 1|
+----------+-------+------------------+------------------+----+
Который придерживается следующей схемы:
root
|-- session_id: integer (nullable = true)
|-- serp_id: integer (nullable = true)
|-- feature_1_(double): double (nullable = false)
|-- feature_2_(double): double (nullable = false)
|-- url1: integer (nullable = true)
Чтобы использовать классификатор нескольких классов для оценки вероятностей классов для url1
, я хочу собрать элементы 1 и 2 в виде вектора, используя VectorAssembler
. Тем не менее, выполняя
assembler = VectorAssembler(
inputCols = ["feature_1_(double)", "feature_2_(double)"],
outputCol = "features")
df3 = assembler.transform(df2)
df3.show(5)
возвращает следующую ошибку. Проверка, содержат ли столбцы в df3
нулевые значения в соответствии с , этот подход приводит к той же ошибке.
Любая помощь приветствуется.
Py4JJavaError: An error occurred while calling o2456.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 89 in stage 533.0 failed 4 times, most recent failure: Lost task 89.3 in stage 533.0 (TID 49754, st446-w-0.europe-west2-b.c.sincere-hearth-236110.internal, executor 23): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 140, in dump_stream
for obj in iterator:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 320, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in <lambda>
return lambda *a: f(*a)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
return f(*args, **kwargs)
File "<ipython-input-102-205c8e3fad14>", line 2, in <lambda>
File "<ipython-input-101-533a3b67fe1f>", line 45, in relevance
KeyError: 567
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage112.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage113.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 140, in dump_stream
for obj in iterator:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 320, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in <lambda>
return lambda *a: f(*a)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
return f(*args, **kwargs)
File "<ipython-input-102-205c8e3fad14>", line 2, in <lambda>
File "<ipython-input-101-533a3b67fe1f>", line 45, in relevance
KeyError: 567
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage112.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage113.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)