Как создать столбец PySpark Dataframe, который перебирает круговой список - PullRequest
0 голосов
/ 20 января 2020

Как я могу добавить столбец в этот фрейм данных Pyspark, который проходит по циклическому списку примерно так:

   df = sc.parallelize([['2019-08-29 01:00:00'],
                              ['2019-08-29 02:00:00'],
                              ['2019-08-29 03:00:00'],
                              ['2019-08-29 04:00:00'],
                              ['2019-08-29 05:00:00'],
                              ['2019-08-29 06:00:00'],
                              ['2019-08-29 07:00:00'],
                              ['2019-08-29 08:00:00'],
                              ['2019-08-29 09:00:00'],
                              ['2019-08-29 10:00:00']]).toDF(['DATETIME']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))

Желаемый результат:

+-------------------+---+
|           DATETIME|NUM|
+-------------------+---+
|2019-08-29 01:00:00|  1|
|2019-08-29 02:00:00|  2|
|2019-08-29 03:00:00|  3|
|2019-08-29 04:00:00|  4|
|2019-08-29 05:00:00|  1|
|2019-08-29 06:00:00|  2|
|2019-08-29 07:00:00|  3|
|2019-08-29 08:00:00|  4|
|2019-08-29 09:00:00|  1|
|2019-08-29 10:00:00|  2|
+-------------------+---+

Большое спасибо

Ответы [ 2 ]

0 голосов
/ 20 января 2020

Я использовал функцию Reduce для работы с исходным фреймом данных (это пример copypastable ):

from functools import reduce

from pyspark import SparkContext
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import when

sc = SparkContext(__file__)
step = 4  # rows qty

df = sc.parallelize([['2019-08-29 01:00:00'],
                    ['2019-08-29 02:00:00'],
                    ['2019-08-29 03:00:00'],
                    ['2019-08-29 04:00:00'],
                    ['2019-08-29 05:00:00'],
                    ['2019-08-29 06:00:00'],
                    ['2019-08-29 07:00:00'],
                    ['2019-08-29 08:00:00'],
                    ['2019-08-29 09:00:00'],
                    ['2019-08-29 10:00:00']])\
        .toDF(['DATETIME'])\
        .withColumn('DATETIME',col('DATETIME')\
        .cast('timestamp'))

df = df.withColumn('tmp_num', row_number().over(Window.orderBy('DATETIME')))
df = df.withColumn('num', lit('A'))  # temporal random value
rows = df.count()
idxs = list(range(1, rows, step))

# Logic is here
def operate_df(df_, idx):
    return df_.select(df_.DATETIME,
                      df_.num,
                      df_.tmp_num,
                      df_.tmp_num.between(idx, (idx + step - 1)).alias('block')
                      ).withColumn('num', when(col('block') == True, 
                                               row_number().over(Window.partitionBy('block').orderBy('block')))
                                               .otherwise(col('num')))

result_df = reduce(operate_df, idxs, df)
result_df = result_df.drop('tmp_num', 'block')  # drop auxiliar columns
print(result_df.show())

Это дает мне такой вывод:

+-------------------+---+
|           DATETIME|num|
+-------------------+---+
|2019-08-29 01:00:00|  1|
|2019-08-29 02:00:00|  2|
|2019-08-29 03:00:00|  3|
|2019-08-29 04:00:00|  4|
|2019-08-29 05:00:00|  1|
|2019-08-29 06:00:00|  2|
|2019-08-29 07:00:00|  3|
|2019-08-29 08:00:00|  4|
|2019-08-29 09:00:00|  1|
|2019-08-29 10:00:00|  2|
+-------------------+---+
0 голосов
/ 20 января 2020

Вы можете использовать row_number для генерации id , а затем использовать оператор модуля (%) для получения вращающихся идентификаторов:

from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window.orderBy("DATETIME")

df
.withColumn("id", f.row_number().over(w))
.withColumn("NUM", (col("id") % 4) + 1)
.drop("id")
.show(100, False)

Вывод:

+-------------------+---+
|DATETIME           |NUM|
+-------------------+---+
|2019-08-29 01:00:00|2  |
|2019-08-29 02:00:00|3  |
|2019-08-29 03:00:00|4  |
|2019-08-29 04:00:00|1  |
|2019-08-29 05:00:00|2  |
|2019-08-29 06:00:00|3  |
|2019-08-29 07:00:00|4  |
|2019-08-29 08:00:00|1  |
|2019-08-29 09:00:00|2  |
|2019-08-29 10:00:00|3  |
|2019-08-29 11:00:00|4  |
|2019-08-29 12:00:00|1  |
|2019-08-29 13:00:00|2  |
|2019-08-29 14:00:00|3  |
|2019-08-29 15:00:00|4  |
|2019-08-29 16:00:00|1  |
|2019-08-29 17:00:00|2  |
|2019-08-29 18:00:00|3  |
|2019-08-29 19:00:00|4  |
|2019-08-29 20:00:00|1  |
|2019-08-29 21:00:00|2  |
|2019-08-29 22:00:00|3  |
|2019-08-29 23:00:00|4  |
|2019-08-30 00:00:00|1  |
|2019-08-30 01:00:00|2  |
|2019-08-30 02:00:00|3  |
|2019-08-30 03:00:00|4  |
|2019-08-30 04:00:00|1  |
+-------------------+---+
...