pyspark сравните две колонки с диагнозом - PullRequest
1 голос
/ 05 марта 2020

Постановка задачи: В pyspark мне нужно сравнить два столбца по диагонали для ex снизу входного фрейма данных. Мне нужно сравнить stn_fr_cd и stn_to_cd, т. Е. Для val_no 1 иметь 2 строки. Теперь я должен сравнить stn_fr_cd первого ряда с stn_to_cd второго ряда и stn_to_cd первого ряда с stn_fr_cd второго ряда.

Снизу входного фрейма данных, так как для val_no оба диагностических элемента stn_fr_Cd и stn_to_cd равны, я увеличил свое значение как 1

Ниже мой вход имеет 4 столбца id, val_no, stn_fr_cd, stn_to_cd

id val_no    stn_fr_cd stn_to_cd

8A   1        CPH      GDN                  

8A   1        GDN      CPH                  

8A   2        GDN      CPH                  

8A   2        CPH      GDN                  

8A   3        CPH      GDN                  

8A   3        GDN      CPH                  

8A   4        CPH      GDN                  

8A   4        GDN      CPH 

Ниже должен быть мой вывод

8A 4

Как получить 4 для val_no 1,2,3,4 и диагональные элементы stn_fr_cd и stn_to_cd равны

Может кто-нибудь, пожалуйста, помогите мне с logi c в pyspark pls. Мне действительно нужно преодолеть это препятствие, пожалуйста, помогите с кодом

1 Ответ

1 голос
/ 05 марта 2020

Я думаю, это то, что вы хотите, я могу ошибаться. Дайте мне знать, если это работает для вас или я могу обновить его. Я использовал оконную функцию, чтобы получить преимущество перед обоими столбцами, и если они оба равны, то этот раздел получит 1, в противном случае 0, а затем просто сгруппируется по id и суммирует мой контрольный столбец. Я добавил еще 2 строки (val_no = 5), чтобы показать, что они не выбраны, поскольку они не удовлетворяют обоим условиям диагоналей.

df.show()

+---+------+---------+---------+
| id|val_no|stn_fr_cd|stn_to_cd|
+---+------+---------+---------+
| 8A|     1|      CPH|      GDN|
| 8A|     1|      GDN|      CPH|
| 8A|     2|      GDN|      CPH|
| 8A|     2|      CPH|      GDN|
| 8A|     3|      CPH|      GDN|
| 8A|     3|      GDN|      CPH|
| 8A|     4|      CPH|      GDN|
| 8A|     4|      GDN|      CPH|
| 8A|     5|      GDN|      GDN|
| 8A|     5|      CPH|      GDN|
+---+------+---------+---------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id","val_no").orderBy("val_no")

df.withColumn("fr", F.lead("stn_fr_cd").over(w))\
  .withColumn("to", F.lead("stn_to_cd").over(w))\
  .withColumn("check", F.when((F.col("stn_fr_cd")==F.col("to"))&(F.col("stn_to_cd")==F.col("fr")),F.lit(1)).otherwise(F.lit(0)))\
  .groupBy("id").agg(F.sum("check").alias("diagonals")).show()

+---+---------+
| id|diagonals|
+---+---------+
| 8A|        4|
+---+---------+
...