pyspark заменяет отрицательное значение на ноль - PullRequest
0 голосов
/ 24 октября 2018

Я могу попросить помощи о замене отрицательного значения с разного между отметкой времени на ноль.Работает на python3 на спарке.Вот мой код:

код:

timeFmt = "yyyy-MM-dd HH:mm:ss"
time_diff_1 = when((col("time1").isNotNull()) &
                       (col("time2").isNotNull()),                      
                       (unix_timestamp('time2', format=timeFmt) - unix_timestamp('time1', format=timeFmt)) / 60
                      ).otherwise(lit(0))

time_diff_2 = when((col("time2").isNotNull()) & 
                       (col("time3").isNotNull()),                       
                       (unix_timestamp('time3', format=timeFmt) - unix_timestamp('time2', format=timeFmt)) / 60
                      ).otherwise(lit(0))

time_diff_3 = when((col("time3").isNotNull()) &                           
                       (col("time4").isNotNull()),                       
                       (unix_timestamp('time4', format=timeFmt) - unix_timestamp('time3', format=timeFmt)) / 60
                      ).otherwise(lit(0))

df = (df      
      .withColumn('time_diff_1', time_diff_1)      
      .withColumn('time_diff_2', time_diff_2)
      .withColumn('time_diff_3', time_diff_3)
     )


df = (df
      .withColumn('time_diff_1', when(col('time_diff_1') < 0, 0).otherwise(col('time_diff_1')))
      .withColumn('time_diff_2', when(col('time_diff_2') < 0, 0).otherwise(col('time_diff_2')))
      .withColumn('time_diff_3', when(col('time_diff_3') < 0, 0).otherwise(col('time_diff_3')))
     )

, когда я запускаю вышеуказанный код, я получаю ошибку.Вот ошибка:

Py4JJavaError: Произошла ошибка при вызове o1083.showString.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 56.0 не выполнена 4 раза, последний сбой: потерянная задача 0.3 на этапе 56.0 (TID 7246, fxhclxcdh8.dftz.local, executor 21): org.codehaus.janino.JaninoRuntimeException: не удалось скомпилировать: org.codehaus.janino.JaninoRuntimeException: код метода "apply_9 $ (Lorg / apache / spark / sql / катализатор / выражения / GeneratedClass $ SpecificUnsafeProjection; Lorg / apache / spark / sql /катализатор / InternalRow;) V "класса" org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection "выходит за пределы 64 КБ / * 001 / public java.lang.Object generate (ссылки на Object []){/ 002 / return new SpecificUnsafeProjection (ссылки);/ 003 /} / 004 / / 005 / class SpecificUnsafeProjection расширяет org.apache.spark.sql.catalyst.expressions.UnsafeProjection {/ 006 / / 007 / private Object [] ссылки;/ 008 / private boolean evalExprIsNull;/ 009 / private boolean evalExprValue;/ 010 /
private логический evalExpr1IsNull;/
011 / private boolean evalExpr1Value;/ 012 / private java.text.DateFormat formatter5;/ 013 / private java.text.DateFormat formatter8;/ 014 /
private java.text.DateFormat formatter12;/
015 / private java.text.DateFormat formatter13;/ 016 / private UTF8String.IntWrapper wrapper;/ 017 / private java.text.DateFormat formatter15;/ 018 / private java.text.DateFormat formatter18;/ 019 / private java.text.DateFormat formatter19;/ 020 / private java.text.DateFormat formatter23;/ 021 / private java.text.DateFormat formatter26;/ 022 / private java.text.DateFormat formatter27;/ 023 / private java.text.DateFormat formatter30;/ 024 * / private java.text.DateFormat formatter32;........

Кто-нибудь может помочь?

1 Ответ

0 голосов
/ 24 октября 2018

Я думаю, что проще всего написать простой UDF (пользовательскую функцию) и применить его к нужному столбцу.Вот пример кода для этого:

import pyspark.sql.functions as f

correctNegativeDiff = f.udf(lambda diff: 0 if diff < 0 else diff, LongType())

df = df.withColumn('time_diff_1', correctNegativeDiff(df.time_diff_1))\
       .withColumn('time_diff_2', correctNegativeDiff(df.time_diff_2))\
       .withColumn('time_diff_3', correctNegativeDiff(df.time_diff_3))
...