Разница между датами в PySpark SQL - PullRequest
0 голосов
/ 16 ноября 2018

Так что мне нужно вычислить разницу между двумя датами.Я знаю, что PySpark SQL поддерживает DATEDIFF, но только на день.Я сделал функцию, которая вычисляет разницу, но у меня просто нет вывода.Код выглядит так:

     ...
logRowsDF.createOrReplaceTempView("taxiTable")
#first way
spark.registerFunction("test", lambda x,y: ((dt.strptime(x, '%Y-%m-%d %H:%M:%S') - dt.strptime(y, '%Y-%m-%d %H:%M:%S')).days * 24 * 60) + ((dt.strptime(x, '%Y-%m-%d %H:%M:%S') - dt.strptime(y, '%Y-%m-%d %H:%M:%S')).seconds/60))
#second
spark.registerFunction("test", lambda x,y: countTime(x,y))
#third
diff = udf(countTime)
#trying to call that function that way
listIpsDF = spark.sql('SELECT diff(pickup,dropoff) AS TIME FROM taxiTable')

Функция:

def countTime(time1, time2):
    fmt = '%Y-%m-%d %H:%M:%S'
    d1 = dt.strptime(time1, fmt)
    d2 = dt.strptime(time2, fmt)
    diff = d2 -d1
    diff_minutes = (diff.days * 24 * 60) + (diff.seconds/60)
    return str(diff_minutes)

Это просто не работает.Вы можете мне помочь?

Пример:

+-------------------+-------------------+
|             pickup|            dropoff|
+-------------------+-------------------+
|2018-01-01 00:21:05|2018-01-01 00:24:23|
|2018-01-01 00:44:55|2018-01-01 01:03:05|
|                  ...                  |
+-------------------+-------------------+

Ожидаемый результат (в минутах):

+-------------------+
|    datediff       |
+-------------------+
|        3.3        |
| 18.166666666666668|
|        ...        |
+-------------------+

1 Ответ

0 голосов
/ 16 ноября 2018

На самом деле я не уверен, где ваша ошибка, потому что некоторые из вашего примера кода не имеют смысла (например, вы регистрируете функцию с именем 'test', но вы используете функцию diff в своем выражении sql, которое не зарегистрировано -> Это должно привести к сообщению об ошибке).В любом случае, пожалуйста, найдите рабочий пример вашего кода ниже:

from pyspark.sql.functions import udf
from datetime import datetime as dt

l = [('2018-01-01 00:21:05','2018-01-01 00:24:23')
,('2018-01-01 00:44:55', '2018-01-01 01:03:05')
]

df = spark.createDataFrame(l,['begin','end'])
df.registerTempTable('test')

def countTime(time1, time2):
    fmt = '%Y-%m-%d %H:%M:%S'
    d1 = dt.strptime(time1, fmt)
    d2 = dt.strptime(time2, fmt)
    diff = d2 - d1
    diff_minutes = (diff.days * 24 * 60) + (diff.seconds/60)
    return str(diff_minutes)

diff = udf(countTime)
sqlContext.registerFunction("diffSQL", lambda x, y: countTime(x,y))

print('column expression udf works')
df.withColumn('bla', diff(df.begin,df.end)).show()
print('sql udf works')
spark.sql('select diffSQL(begin,end) from test').show()

Вывод примера:

column expression udf works
+-------------------+-------------------+------------------+ 
|              begin|                end|               bla| 
+-------------------+-------------------+------------------+ 
|2018-01-01 00:21:05|2018-01-01 00:24:23|               3.3| 
|2018-01-01 00:44:55|2018-01-01 01:03:05|18.166666666666668| 
+-------------------+-------------------+------------------+ 
sql udf works 
+-------------------+ 
|diffSQL(begin, end)| 
+-------------------+ 
|                3.3| 
| 18.166666666666668|
+-------------------+
...