обработка исключений UDF для pyspark dataframe - PullRequest
0 голосов
/ 06 мая 2018

Я написал один UDF для использования в spark с использованием python. Эта функция занимает одна дата (в строке, например, «2017-01-06») и один массив строк (например: [2017-01-26, 2017-02-26, 2017-04-17]) и вернуть #days с последней ближайшей даты. UDF составляет

def findClosestPreviousDate(currdate, date_list):
    date_format = "%Y-%m-%d"
    currdate = datetime.datetime.strptime(currdate, date_format)
    result = currdate
    date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    lowestdiff = 10000
    for dt in date_list:
        if(dt >= currdate):
            continue
        delta = currdate-dt
        diff = delta.days
        if(diff < lowestdiff):
            lowestdiff = diff
            result = dt
    dlt = currdate-result
    return dlt.days


findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())

Я звоню, как показано ниже

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

Даже если я удаляю все пустые значения в столбце «activity_arr», я получаю эту ошибку NoneType . Пробовал применять обработку исключений внутри функции (все то же самое).

Есть ли у нас лучший способ отловить записи с ошибками во время выполнения из UDF (возможно, с использованием аккумулятора или около того, я видел, что мало кто пробовал то же самое с использованием scala)

ОШИБКА:

----------------------------------------------- ---------------------------- Py4JJavaError Traceback (последний вызов последний) в () ----> 1 grouped_extend_df2.show ()

/ usr / lib / spark / python / pyspark / sql / dataframe.pyc в шоу (self, n, усечение) 334 "" " 335 если isinstance (усечь, bool) и усечь: -> 336 отпечатков (self._jdf.showString (n, 20)) Еще 337: 338 print (self._jdf.showString (n, int (truncate)))

/ usr / lib / spark / python / lib / py4j-0.10.4-src.zip / py4j / java_gateway.py в вызов (self, * args) 1131 answer = self.gateway_client.send_command (команда) 1132 return_value = get_return_value ( -> 1133 ответа, self.gateway_client, self.target_id, self.name) 1134 1135 для temp_arg в temp_args:

/ usr / lib / spark / python / pyspark / sql / utils.pyc in deco (* a, ** kw) 61 def deco (*, ** кВт): 62 попробуйте: ---> 63 возврата f (* a, ** кВт) 64 за исключением py4j.protocol.Py4JJavaError как e: 65 с = e.java_exception.toString ()

/ usr / lib / spark / python / lib / py4j-0.10.4-src.zip / py4j / protocol.py в get_return_value (ответ, gateway_client, target_id, name) 317 поднять Py4JJavaError ( 318 "Произошла ошибка при вызове {0} {1} {2}. \ N". -> формат 319 (target_id, ".", Name), значение) Еще 320: 321 рейз Py4JError (

Py4JJavaError: Произошла ошибка при вызове o1111.showString. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: Задача 0 на этапе 315.0 не выполнена 1 раз, последний сбой: потерянная задача 0,0 на этапе 315.0 (TID 18390, localhost, драйвер исполнителя): org.apache.spark.api.python.PythonException: обратная связь (самая последняя последний звонок): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 177, в основном Файл process () "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 172, в процессе serializer.dump_stream (func (split_index, iterator), выходной файл) Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 104, в func = lambda _, it: файл map (mapper, it) "", строка 1, в файле "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 71, в return lambda * a: f (* a) Файл "", строка 5, в findClosestPreviousDate TypeError: объект 'NoneType' не является итерацию

в org.apache.spark.api.python.PythonRunner $$ Анон $ 1.read (PythonRDD.scala: 193) в org.apache.spark.api.python.PythonRunner $$ Анон $ 1 (PythonRDD.scala: 234). в org.apache.spark.api.python.PythonRunner.compute (PythonRDD.scala: 152) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.Apply (BatchEvalPythonExec.scala: 144) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.Apply (BatchEvalPythonExec.scala: 87) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ применять $ 23.apply (RDD.scala: 797) вorg.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ применять $ 23.apply (RDD.scala: 797) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748)

трассировка стека драйверов: в org.apache.spark.scheduler.DAGScheduler.org $ апача $ искры $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1517) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1505) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1504) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: +1504) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 814) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 814) в scala.Option.foreach (Option.scala: 257) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 814) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1732) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1687) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1676) в org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 48) в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 630) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2029) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2050) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2069) в org.apache.spark.sql.execution.SparkPlan.executeTake (SparkPlan.scala: 336) в org.apache.spark.sql.execution.CollectLimitExec.executeCollect (limit.scala: 38) в org.apache.spark.sql.Dataset.org $ апача $ искры $ SQL $ Dataset $$ collectFromPlan (Dataset.scala: 2861) в org.apache.spark.sql.Dataset $$ anonfun $ головы $ 1.Apply (Dataset.scala: 2150) в org.apache.spark.sql.Dataset $$ anonfun $ головы $ 1.Apply (Dataset.scala: 2150) в org.apache.spark.sql.Dataset $$ anonfun $ 55.apply (Dataset.scala: 2842) в org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId (SQLExecution.scala: 65) в org.apache.spark.sql.Dataset.withAction (Dataset.scala: 2841) в org.apache.spark.sql.Dataset.head (Dataset.scala: 2150) в org.apache.spark.sql.Dataset.take (Dataset.scala: 2363) в org.apache.spark.sql.Dataset.showString (Dataset.scala: 241) в sun.reflect.GeneratedMethodAccessor237.invoke (неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) вpy4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (Gateway.java:280) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.command.CecCom(CallCommand.java:79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:748). Вызывается: org.apache.spark.api.python.PythonException:Traceback (последний вызов был последним): файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", строка 177, в файле основного процесса () "/ usr / lib / spark / python/lib/pyspark.zip/pyspark/worker.py ", строка 172, в процессе serializer.dump_stream (func (split_index, iterator), outfile) Файл" /usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 104, в func = lambda _, it: файл map (mapper, it)" ", строка 1, в файле" /usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py ", строка 71, взамен лямбда * a: f (* a) файл" ", строка 5, в findClosestPreviousDate TypeError: объект 'NoneType' не повторяется

в org.apache.spark.api.python.PythonRunner $$ anon $ 1.read (PythonRDD.scala: 193) в org.apache.spark.api.python.PythonRunner $$ anon $ 1. (PythonRDD.scala: 234) в org.apache.spark.api.python.PythonRunner.compute (PythonRDD.scala: 152) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply (BatchEvalPyExEx.scala: 144) в org.apache.spark.sql.execution.python.BatchEvalPythonExec $$ anonfun $ doExecute $ 1.apply (BatchEvalPythonExec.scala: 87) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1$$ anonfun $ apply $ 23.apply (RDD.scala: 797) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitions $ 1 $$ anonfun $ apply $ 23.apply (RDD.scala: 797) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) вorg.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ... еще 1

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

думаю разобрался с проблемой. Вот мой модифицированный UDF.

def findClosestPreviousDate(currdate, date_str):
    date_format = "%Y-%m-%d"
    currdate = datetime.datetime.strptime(currdate, date_format)
    date_list = ''
    result = currdate
    if date_str is None:
        return date_str
    else:
        date_list = date_str.split('|')
    date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
    lowestdiff = 10000
    for dt in date_list:
        if(dt >= currdate):
            continue
        delta = currdate-dt
        diff = delta.days
        if(diff < lowestdiff):
            lowestdiff = diff
            result = dt
    dlt = currdate-result
    return dlt.days

Ошибка NoneType произошла из-за того, что нулевые значения попали в UDF в качестве параметров, которые я знал. Интересно, почему нулевые значения не отфильтровываются, когда я использую функцию isNotNull ().

пробовал оба

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(size(col("activity_arr")) > 0, findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

и

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())
grouped_extend_df2 = grouped_extend_email_rec.withColumn('recency_eng', func.when(col("activity_arr").isNotNull(), findClosestPreviousDateUdf("expanded_datestr", "activity_arr")).otherwise(0))

Однако, когда я передал NoneType в функции Python выше в функции findClosestPreviousDate (), как показано ниже

if date_str is None:
    return date_str
else:
    date_list = date_str.split('|')

это сработало.

0 голосов
/ 07 мая 2018

Я попробовал ваш udf, но он постоянно возвращает 0 (int).

dlt = currdate-result # result and currdate are same
return dlt.days # days is int type

Но при создании udf вы указали StringType.

findClosestPreviousDateUdf = udf(findClosestPreviousDate,StringType())

Следовательно, я изменилfindClosestPreviousDate функция, пожалуйста, внесите изменения, если необходимо.

>>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
>>>
>>> def findClosestPreviousDate(currdate, date_list=in_dates):
...     date_format = "%Y-%m-%d"
...     currdate = datetime.datetime.strptime(currdate, date_format)
...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
...     diff = map(lambda dt: (currdate - dt).days, date_list)
...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
...     return closestDate if closestDate else 0
...
>>> findClosestPreviousDate('2017-01-06')
-101

Также сделал тип возврата udf как IntegerType.С этими изменениями код работает, но, пожалуйста, проверьте правильность изменений.PySpark udfs может принимать только один аргумент, есть обходной путь, см. PySpark - список передачи в качестве параметра для UDF

>>> df.show()
+----------+
|      date|
+----------+
|2017-01-06|
|2017-01-08|
+----------+

>>>
>>> in_dates = ['2017-01-26', '2017-02-26', '2017-04-17']
>>> def findClosestPreviousDate(currdate, date_list=in_dates):
...     date_format = "%Y-%m-%d"
...     currdate = datetime.datetime.strptime(currdate, date_format)
...     date_list = [datetime.datetime.strptime(x, date_format) for x in date_list if x != None]
...     diff = map(lambda dt: (currdate - dt).days, date_list)
...     closestDate = min(filter(lambda days_diff: days_diff <= 0, diff))
...     return closestDate if closestDate else 0
...
>>> findClosestPreviousDate('2017-01-06')
-101
>>>
>>> from pyspark.sql.types import IntegerType
>>> findClosestPreviousDateUDF = udf(findClosestPreviousDate, IntegerType())
>>> df.withColumn('closest_date', findClosestPreviousDateUDF(df['date'])).show()
+----------+------------+
|      date|closest_date|
+----------+------------+
|2017-01-06|        -101|
|2017-01-08|         -99|
+----------+------------+

Надеюсь, это поможет!

...