Как найти следующий элемент из текущей строки во фрейме данных с помощью Spark Windowing? - PullRequest
0 голосов
/ 26 июня 2019

У меня есть следующий Dataframe:

+------+----------+-------------+--------------------+---------+-----+----------+
|ID    |MEM_ID    | BFS         | SVC_DT             |TYP      |SEQ  |BFS_SEQ   |
+------+----------+----------------------------------+---------+-----+----------+
|105771|29378668  | BRIMONIDINE | 2019-02-04 00:00:00|PD       |1    |1         |
|105772|29378668  | BRIMONIDINE | 2019-04-04 00:00:00|PD       |2    |2         |
|105773|29378668  | BRIMONIDINE | 2019-04-17 00:00:00|RV       |3    |3         |
|105774|29378668  | TIMOLOL     | 2019-04-17 00:00:00|RV       |4    |1         |
|105775|29378668  | BRIMONIDINE | 2019-04-22 00:00:00|PD       |5    |4         |
|105776|29378668  | TIMOLOL     | 2019-04-22 00:00:00|PD       |6    |2         |
+------+----------+----------------------------------+---------+-----+----------+

Для каждой строки я должен найти вхождение следующего типа 'PD' на уровне BFS из текущей строки и заполнить связанный с ним идентификатор как новый столбецс именем 'NEXT_PD_TYP_ID'

Ожидаемый вывод:

+------+---------+-------------+--------------------+----+-----+---------+---------------+
|ID    |MEM_ID   | BFS         | SVC_DT             |TYP |SEQ  |BFS_SEQ  |NEXT_PD_TYP_ID |
+------+---------+----------------------------------+----+-----+---------+---------------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD  |1    |1        |105772         |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD  |2    |2        |105775         | 
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV  |3    |3        |105775         |
|105774|29378668 | TIMOLOL     | 2019-04-17 00:00:00|RV  |4    |1        |105776         |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD  |5    |4        |null           | 
|105776|29378668 | TIMOLOL     | 2019-04-22 00:00:00|PD  |6    |2        |null           |
+------+---------+----------------------------------+----+-----+---------+---------------+

Нужна помощь.

Я пытался использовать условное агрегирование: max (when), однако с тех пор, какон имеет более одного 'PD', максимум возвращает только одно значение для всех строк.

Нет сообщений об ошибках

1 Ответ

0 голосов
/ 27 июня 2019

Надеюсь, это поможет. Я создал новый столбец с идентификаторами TYP === PD. Я назвал это TYPPDID. Затем я использовал оконную рамку в диапазоне от следующей строки до неограниченной следующей строки и получил первый ненулевой TYPPDID orderBy("ID") в конце концов, только чтобы показать записи в порядке.

import org.apache.spark.sql.functions._

val df = Seq(
("105771", "BRIMONIDINE", "PD"),
("105772", "BRIMONIDINE", "PD"),
("105773", "BRIMONIDINE","RV"),
("105774", "TIMOLOL", "RV"),
("105775", "BRIMONIDINE", "PD"),
("105776", "TIMOLOL", "PD")
).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]

scala> df.show
+------+-----------+---+-------+
|    ID|        BFS|TYP|TYPPDID|
+------+-----------+---+-------+
|105771|BRIMONIDINE| PD| 105771|
|105772|BRIMONIDINE| PD| 105772|
|105773|BRIMONIDINE| RV|   null|
|105774|    TIMOLOL| RV|   null|
|105775|BRIMONIDINE| PD| 105775|
|105776|    TIMOLOL| PD| 105776|
+------+-----------+---+-------+


scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef


scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
+------+-----------+---+-------+-------+
|ID    |BFS        |TYP|TYPPDID|NEXT_PD_TYP_ID|
+------+-----------+---+-------+-------+
|105771|BRIMONIDINE|PD |105771 |105772 |
|105772|BRIMONIDINE|PD |105772 |105775 |
|105773|BRIMONIDINE|RV |null   |105775 |
|105774|TIMOLOL    |RV |null   |105776 |
|105775|BRIMONIDINE|PD |105775 |null   |
|105776|TIMOLOL    |PD |105776 |null   |
+------+-----------+---+-------+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...