Первое создание примера кадра данных,
import pyspark.sql.functions as F
from pyspark.sql.types import *
df = sql.createDataFrame([
(1, 1, 'v1' , 'l1'),
(2, 1, 'v1' , 'v1'),
(3, 1, 'v1' , 'l2'),
(4, 2, 'v2' , 'v2'),
(5, 2, 'v2' , 'l3'),
(6, 3, 'v3' , 'l3'),
],[
'rowid', 'id', 'k1', 'k2'])
Затем создайте udf и примените его к столбцам,
def get_rank_udf(rows):
rows = sorted(rows, key=lambda x: x['rowid'])
first_row_id = rows[0]['rowid']
for _r in rows:
if _r['k1'] == _r['k2']:
equal_row_id = _r['rowid']
break
else:
equal_row_id = None
if equal_row_id is None:
return 0
return equal_row_id - first_row_id + 1
get_rank = F.udf(lambda x: get_rank_udf(x), IntegerType())
df = df.groupby('id', 'k1').agg(F.collect_list(F.struct('rowid', 'k1', 'k2')).alias('elements'))\
.withColumn('rank', get_rank(F.col('elements')))\
.select('id', 'k1', 'rank')
Это дает вывод,
+---+---+----+
| id| k1|rank|
+---+---+----+
| 1| v1| 2|
| 2| v2| 1|
| 3| v3| 0|
+---+---+----+