Надеюсь, это поможет.
Я создал новый столбец с идентификаторами TYP === PD. Я назвал это TYPPDID.
Затем я использовал оконную рамку в диапазоне от следующей строки до неограниченной следующей строки и получил первый ненулевой TYPPDID
orderBy("ID")
в конце концов, только чтобы показать записи в порядке.
import org.apache.spark.sql.functions._
val df = Seq(
("105771", "BRIMONIDINE", "PD"),
("105772", "BRIMONIDINE", "PD"),
("105773", "BRIMONIDINE","RV"),
("105774", "TIMOLOL", "RV"),
("105775", "BRIMONIDINE", "PD"),
("105776", "TIMOLOL", "PD")
).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]
scala> df.show
+------+-----------+---+-------+
| ID| BFS|TYP|TYPPDID|
+------+-----------+---+-------+
|105771|BRIMONIDINE| PD| 105771|
|105772|BRIMONIDINE| PD| 105772|
|105773|BRIMONIDINE| RV| null|
|105774| TIMOLOL| RV| null|
|105775|BRIMONIDINE| PD| 105775|
|105776| TIMOLOL| PD| 105776|
+------+-----------+---+-------+
scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef
scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
+------+-----------+---+-------+-------+
|ID |BFS |TYP|TYPPDID|NEXT_PD_TYP_ID|
+------+-----------+---+-------+-------+
|105771|BRIMONIDINE|PD |105771 |105772 |
|105772|BRIMONIDINE|PD |105772 |105775 |
|105773|BRIMONIDINE|RV |null |105775 |
|105774|TIMOLOL |RV |null |105776 |
|105775|BRIMONIDINE|PD |105775 |null |
|105776|TIMOLOL |PD |105776 |null |
+------+-----------+---+-------+-------+