Создайте столбец Df с диапазоном чисел, не используя UDF - PullRequest
2 голосов
/ 12 февраля 2020

В Spark 2.4.3 с python как мне создать новый столбец с диапазоном чисел? Я работал с UDF, но предпочел бы этого не делать. Вот код.

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, explode, udf, col, minute, second
from pyspark.sql.types import ArrayType, IntegerType

def seconds_range(start_date,end_date):
    start_seconds = start_date.minute * 60 + start_date.second
    end_seconds = end_date.minute * 60 + end_date.second
    return list(range(start_seconds, end_seconds+1))

spark = SparkSession.builder.appName('MyApp').master('local[2]').getOrCreate()

# register udf function with spark
seconds_range_udf = udf(seconds_range, ArrayType(IntegerType()))


# create dataframe with sample data.
df1 = spark.createDataFrame([('user1', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
    ('user2', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
    ('user3', '2019-12-01 9:03:23', '2019-12-01 09:03:50')],\
    ['user', 'login_start_dt', 'login_end_dt'])

df1 = df1.\
    withColumn('user', df1.user).\
    withColumn('login_start_dt', to_timestamp(df1.login_start_dt , 'yyyy-MM-dd HH:mm:ss')).\
    withColumn('login_end_dt', to_timestamp(df1.login_end_dt, 'yyyy-MM-dd HH:mm:ss'))

df2 = df1.\
    withColumn('login_offset', (minute(df1.login_start_dt) * 60 + (second(df1.login_start_dt))).cast(IntegerType())).\
    withColumn('logout_offset', (minute(df1.login_end_dt) * 60 + (second(df1.login_end_dt))).cast(IntegerType())).\
#     withColumn('arr_logged_seconds', seconds_range_udf('login_start_dt ', 'login_end_dt'))  # this line works
    withColumn('arr_logged_seconds', list(range(col('login_offset'), col('logout_offset'))))  # would like to get this line to work.

df2.show()

Я получаю сообщение об ошибке «Объект« Столбец »не может быть интерпретирован как целое число». Я также хочу убедиться, что я могу добавить эту дополнительную секунду, поскольку «диапазон» исключает второй параметр.

Спасибо

1 Ответ

1 голос
/ 12 февраля 2020

IIU C, начиная с spark> 2.4 и далее, вы можете использовать функцию sequence для достижения того же самого.

import  pyspark.sql.functions as f


df2 = df1.\
     withColumn('login_offset', (minute(df1.login_start_dt) * 60 + (second(df1.login_start_dt))).cast(IntegerType())).\
     withColumn('logout_offset', (minute(df1.login_end_dt) * 60 + (second(df1.login_end_dt))).cast(IntegerType())).\
     withColumn('arr_logged_seconds', f.sequence('login_offset', 'logout_offset')) # sequence(start, stop, step=None) : step = 1 default

df2.show()
+-----+-------------------+-------------------+------------+-------------+--------------------+
| user|     login_start_dt|       login_end_dt|login_offset|logout_offset|  arr_logged_seconds|
+-----+-------------------+-------------------+------------+-------------+--------------------+
|user1|2019-12-01 09:02:30|2019-12-01 09:04:00|         150|          240|[150, 151, 152, 1...|
|user2|2019-12-01 09:02:30|2019-12-01 09:04:00|         150|          240|[150, 151, 152, 1...|
|user3|2019-12-01 09:03:23|2019-12-01 09:03:50|         203|          230|[203, 204, 205, 2...|
+-----+-------------------+-------------------+------------+-------------+--------------------+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...