Я использовал функцию Reduce для работы с исходным фреймом данных (это пример copypastable ):
from functools import reduce
from pyspark import SparkContext
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import when
sc = SparkContext(__file__)
step = 4 # rows qty
df = sc.parallelize([['2019-08-29 01:00:00'],
['2019-08-29 02:00:00'],
['2019-08-29 03:00:00'],
['2019-08-29 04:00:00'],
['2019-08-29 05:00:00'],
['2019-08-29 06:00:00'],
['2019-08-29 07:00:00'],
['2019-08-29 08:00:00'],
['2019-08-29 09:00:00'],
['2019-08-29 10:00:00']])\
.toDF(['DATETIME'])\
.withColumn('DATETIME',col('DATETIME')\
.cast('timestamp'))
df = df.withColumn('tmp_num', row_number().over(Window.orderBy('DATETIME')))
df = df.withColumn('num', lit('A')) # temporal random value
rows = df.count()
idxs = list(range(1, rows, step))
# Logic is here
def operate_df(df_, idx):
return df_.select(df_.DATETIME,
df_.num,
df_.tmp_num,
df_.tmp_num.between(idx, (idx + step - 1)).alias('block')
).withColumn('num', when(col('block') == True,
row_number().over(Window.partitionBy('block').orderBy('block')))
.otherwise(col('num')))
result_df = reduce(operate_df, idxs, df)
result_df = result_df.drop('tmp_num', 'block') # drop auxiliar columns
print(result_df.show())
Это дает мне такой вывод:
+-------------------+---+
| DATETIME|num|
+-------------------+---+
|2019-08-29 01:00:00| 1|
|2019-08-29 02:00:00| 2|
|2019-08-29 03:00:00| 3|
|2019-08-29 04:00:00| 4|
|2019-08-29 05:00:00| 1|
|2019-08-29 06:00:00| 2|
|2019-08-29 07:00:00| 3|
|2019-08-29 08:00:00| 4|
|2019-08-29 09:00:00| 1|
|2019-08-29 10:00:00| 2|
+-------------------+---+