вы можете попробовать настроить функцию сдвига и поворота массива, а затем использовать F.udf()
:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, LongType
df = spark.createDataFrame(
[(1,1,2,3,4,1),(2,5,6,7,8,3),(3,9,10,11,12,2),(4,13,14,15,16,0),(5,17,18,19,20,5)]
, ['Id','Col1','Col2','Col3','Col4','shift']
)
df.printSchema()
#root
# |-- Id: long (nullable = true)
# |-- Col1: long (nullable = true)
# |-- Col2: long (nullable = true)
# |-- Col3: long (nullable = true)
# |-- Col4: long (nullable = true)
# |-- shift: long (nullable = true)
# colume names to shift/rotate
cols = df.columns[1:-1]
#['Col1', 'Col2', 'Col3', 'Col4']
#@F.udf("array<long>")
def my_shift(arr, n):
if n == 0: return arr
arr_len = len(arr)
return [ arr[(i+n)%arr_len] for i in range(arr_len) ]
shift_udf = F.udf(my_shift, ArrayType(LongType()))
# group the cols into an array and then run shift_udf(arr, n) to form 'new_arr'
df_new = (df.withColumn('arr', F.array([ F.col(c) for c in cols ]))
.withColumn('new_arr', shift_udf('arr', 'shift'))
.select('ID', 'shift', 'arr', 'new_arr', *[ F.col('new_arr')[i].alias(cols[i]) for i in range(len(cols)) ])
)
df_new.show()
#+---+-----+----------------+----------------+----+----+----+----+
#| ID|shift| arr| new_arr|Col1|Col2|Col3|Col4|
#+---+-----+----------------+----------------+----+----+----+----+
#| 1| 1| [1, 2, 3, 4]| [2, 3, 4, 1]| 2| 3| 4| 1|
#| 2| 3| [5, 6, 7, 8]| [8, 5, 6, 7]| 8| 5| 6| 7|
#| 3| 2| [9, 10, 11, 12]| [11, 12, 9, 10]| 11| 12| 9| 10|
#| 4| 0|[13, 14, 15, 16]|[13, 14, 15, 16]| 13| 14| 15| 16|
#| 5| 5|[17, 18, 19, 20]|[18, 19, 20, 17]| 18| 19| 20| 17|
#+---+-----+----------------+----------------+----+----+----+----+