Вот несколько методов, которые вы можете попробовать:
Метод-1: строковые функции: повтор , подстрока
- рассчитать количество повторений
n = ceil(session_duration/24)
- создать строку
a
, которая повторяет подстроку 8,
для n
раз, а затем использовать substring () или regexp_replace () , чтобы удалить запятую ,
- разделите
a
запятой и затем выделите ее в строки pos
и session_duration
- отрегулируйте start_date на
pos
из вышеприведенного шага
- приведите строку
session_duration
в double
см. Пример кода ниже:
from pyspark.sql import functions as F
# assume the columns in your dataframe are read with proper data types
# for example using inferSchema=True
df = spark.read.csv('/path/to/file', header=True, inferSchema=True)
df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
.withColumn('a', F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration')))
>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|client_username|workstation|session_duration|access_point_name| start_date| n| a|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01 00:00:00| 1| 1.55|
| XX2@AD| Apple| 30.12| idf_2|2019-06-04 00:00:00| 2| 8,8|
| XX3@AD| Apple| 78.25| idf_3|2019-06-02 00:00:00| 4|8,8,8,8|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02 00:00:00| 1| 0.45|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02 00:00:00| 1| 23.11|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
df_new = df1.select(
'client_username'
, 'workstation'
, F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).drop('pos')
>>> df_new.show()
+---------------+-----------+----------------+-----------------+----------+
|client_username|workstation|session_duration|access_point_name|start_date|
+---------------+-----------+----------------+-----------------+----------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01|
| XX2@AD| Apple| 8| idf_2|2019-06-04|
| XX2@AD| Apple| 8| idf_2|2019-06-05|
| XX3@AD| Apple| 8| idf_3|2019-06-02|
| XX3@AD| Apple| 8| idf_3|2019-06-03|
| XX3@AD| Apple| 8| idf_3|2019-06-04|
| XX3@AD| Apple| 8| idf_3|2019-06-05|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02|
+---------------+-----------+----------------+-----------------+----------+
Вышеприведенный код также можно записать в одну цепочку:
df_new = df.withColumn('n'
, F.ceil(F.col('session_duration')/24).astype('int')
).withColumn('a'
, F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration'))
).select('client_username'
, 'workstation'
, F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).withColumn('session_duration'
, F.col('session_duration').astype('double')
).drop('pos')
Метод-2: функция массива array_repeat (pyspark 2.4 +)
Аналогично методу 1, но a
уже является массивом, поэтому нет необходимости разбивать строку на массив:
df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
.withColumn('a', F.when(F.col('n')>1, F.expr('array_repeat(8,n)')).otherwise(F.array('session_duration')))
>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|client_username|workstation|session_duration|access_point_name| start_date| n| a|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01 00:00:00| 1| [1.55]|
| XX2@AD| Apple| 30.12| idf_2|2019-06-04 00:00:00| 2| [8.0, 8.0]|
| XX3@AD| Apple| 78.25| idf_3|2019-06-02 00:00:00| 4|[8.0, 8.0, 8.0, 8.0]|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02 00:00:00| 1| [0.45]|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02 00:00:00| 1| [23.11]|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
df_new = df1.select('client_username'
, 'workstation'
, F.posexplode('a').alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).drop('pos')