Я написал один UDF для использования в spark с использованием python. Эта функция занимает
одна дата (в строке, например, «2017-01-06») и
один массив строк (например: [2017-01-26, 2017-02-26, 2017-04-17])
и вернуть #days с последней ближайшей даты. UDF составляет
Даже если я удаляю все пустые значения в столбце «activity_arr», я получаю эту ошибку NoneType . Пробовал применять обработку исключений внутри функции (все то же самое).
Есть ли у нас лучший способ отловить записи с ошибками во время выполнения из UDF (возможно, с использованием аккумулятора или около того, я видел, что мало кто пробовал то же самое с использованием scala)
----------------------------------------------- ---------------------------- Py4JJavaError Traceback (последний вызов
последний) в ()
----> 1 grouped_extend_df2.show ()
/ usr / lib / spark / python / pyspark / sql / dataframe.pyc в шоу (self, n,
усечение)
334 "" "
335 если isinstance (усечь, bool) и усечь:
-> 336 отпечатков (self._jdf.showString (n, 20))
Еще 337:
338 print (self._jdf.showString (n, int (truncate)))
/ usr / lib / spark / python / lib / py4j-0.10.4-src.zip / py4j / java_gateway.py в
вызов (self, * args) 1131 answer = self.gateway_client.send_command (команда) 1132 return_value
= get_return_value (
-> 1133 ответа, self.gateway_client, self.target_id, self.name) 1134 1135 для temp_arg в temp_args:
/ usr / lib / spark / python / pyspark / sql / utils.pyc in deco (* a, ** kw)
61 def deco (*, ** кВт):
62 попробуйте:
---> 63 возврата f (* a, ** кВт)
64 за исключением py4j.protocol.Py4JJavaError как e:
65 с = e.java_exception.toString ()
/ usr / lib / spark / python / lib / py4j-0.10.4-src.zip / py4j / protocol.py в
get_return_value (ответ, gateway_client, target_id, name)
317 поднять Py4JJavaError (
318 "Произошла ошибка при вызове {0} {1} {2}. \ N".
-> формат 319 (target_id, ".", Name), значение)
Еще 320:
321 рейз Py4JError (
Py4JJavaError: Произошла ошибка при вызове o1111.showString. :
org.apache.spark.SparkException: задание прервано из-за сбоя этапа:
Задача 0 на этапе 315.0 не выполнена 1 раз, последний сбой: потерянная задача
0,0 на этапе 315.0 (TID 18390, localhost, драйвер исполнителя): org.apache.spark.api.python.PythonException: обратная связь (самая последняя
последний звонок): File
"/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 177,
в основном
Файл process () "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 172,
в процессе
serializer.dump_stream (func (split_index, iterator), выходной файл) Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка
104, в
func = lambda _, it: файл map (mapper, it) "", строка 1, в файле
"/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 71, в
return lambda * a: f (* a) Файл "", строка 5, в findClosestPreviousDate TypeError: объект 'NoneType' не является
итерацию
в
org.apache.spark.api.python.PythonRunner $$ Анон $ 1.read (PythonRDD.scala: 193)
в
org.apache.spark.api.python.PythonRunner $$ Анон $ 1 (PythonRDD.scala: 234).
в
org.apache.spark.api.python.PythonRunner.compute (PythonRDD.scala: 152)
в
org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.Apply (BatchEvalPythonExec.scala: 144)
в
org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.Apply (BatchEvalPythonExec.scala: 87)
в
org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ применять $ 23.apply (RDD.scala: 797)
вorg.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ применять $ 23.apply (RDD.scala: 797)
в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323)
в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323)
в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323)
в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в
org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в
org.apache.spark.scheduler.Task.run (Task.scala: 108) в
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338)
в
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
в
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
трассировка стека драйверов: в
org.apache.spark.scheduler.DAGScheduler.org $ апача $ искры $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1517)
в
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1505)
в
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1504)
в
scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48)
в
org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: +1504)
в
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 814)
в
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 814)
в scala.Option.foreach (Option.scala: 257) в
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 814)
в
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1732)
в
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1687)
в
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1676)
в org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 48)
в
org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 630)
в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2029) в
org.apache.spark.SparkContext.runJob (SparkContext.scala: 2050) в
org.apache.spark.SparkContext.runJob (SparkContext.scala: 2069) в
org.apache.spark.sql.execution.SparkPlan.executeTake (SparkPlan.scala: 336)
в
org.apache.spark.sql.execution.CollectLimitExec.executeCollect (limit.scala: 38)
в
org.apache.spark.sql.Dataset.org $ апача $ искры $ SQL $ Dataset $$ collectFromPlan (Dataset.scala: 2861)
в
org.apache.spark.sql.Dataset $$ anonfun $ головы $ 1.Apply (Dataset.scala: 2150)
в
org.apache.spark.sql.Dataset $$ anonfun $ головы $ 1.Apply (Dataset.scala: 2150)
в org.apache.spark.sql.Dataset $$ anonfun $ 55.apply (Dataset.scala: 2842)
в
org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId (SQLExecution.scala: 65)
в org.apache.spark.sql.Dataset.withAction (Dataset.scala: 2841) в
org.apache.spark.sql.Dataset.head (Dataset.scala: 2150) в
org.apache.spark.sql.Dataset.take (Dataset.scala: 2363) в
org.apache.spark.sql.Dataset.showString (Dataset.scala: 241) в
sun.reflect.GeneratedMethodAccessor237.invoke (неизвестный источник) в
sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
в java.lang.reflect.Method.invoke (Method.java:498) в
py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) вpy4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (Gateway.java:280) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.command.CecCom(CallCommand.java:79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:748). Вызывается: org.apache.spark.api.python.PythonException:Traceback (последний вызов был последним): файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 177, в файле основного процесса () "/ usr / lib / spark / python/lib/pyspark.zip/pyspark/worker.py ", строка 172, в процессе serializer.dump_stream (func (split_index, iterator), outfile) Файл" /usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 104, в func = lambda _, it: файл map (mapper, it)" ", строка 1, в файле" /usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 71, взамен лямбда * a: f (* a) файл" ", строка 5, в findClosestPreviousDate TypeError: объект 'NoneType' не повторяется
в org.apache.spark.api.python.PythonRunner $$ anon $ 1.read (PythonRDD.scala: 193) в org.apache.spark.api.python.PythonRunner $$ anon $ 1. (PythonRDD.scala: 234) в org.apache.spark.api.python.PythonRunner.compute (PythonRDD.scala: 152) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply (BatchEvalPyExEx.scala: 144) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply (BatchEvalPythonExec.scala: 87) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1$$ anonfun $ apply $ 23.apply (RDD.scala: 797) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ apply $ 23.apply (RDD.scala: 797) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) вorg.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ... еще 1