Несколько условий AND для одного столбца в pyspark без операции соединения - PullRequest
0 голосов
/ 28 февраля 2019

У меня есть таблица из трех столбцов [s,p,o].Я хотел бы удалить строки, чтобы для каждой записи в s столбец p не включал в себя значения [P625, P36].Например,

+----+----+------
|   s|   p|  o  |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625|   51|
| Q45| P36| Q597|
| Q45|P625|  123|
| Q51|P625|   22|
| Q24|P625|   56|

Конечный результат должен быть

+----+----+------
|   s|   p|  o  |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625|   51|
| Q45| P36| Q597|
| Q45|P625|  123|

Используя операцию соединения, описанная выше задача проста.

df.filter(df.p=='P625').join(df.filter(df.p=='P36'),'s')

Но есть ли более элегантный способ сделать это?

Ответы [ 2 ]

0 голосов
/ 01 марта 2019

Вам нужно окно

from pyspark.sql import Window
from pyspark.sql.functions import *

winSpec = Window.partitionBy('s')
df.withColumn("s_list", collect_list("s").over(winSpec)).
filter(array_contains(col("s_list"), "P625") & array_contains(col("s_list"), "P36") & size(col("s_list")) = 2)
0 голосов
/ 01 марта 2019

Простите, так как я гораздо лучше знаком с API Scala, но, возможно, вы можете легко преобразовать его:

scala> val df = spark.createDataset(Seq(
     |      ("Q31", "P36", "Q239"),
     |      ("Q31", "P625", "51"),
     |      ("Q45", "P36", "Q597"),
     |      ("Q45", "P625", "123"),
     |      ("Q51", "P625", "22"),
     |      ("Q24", "P625", "56")
     | )).toDF("s", "p", "o")
df: org.apache.spark.sql.DataFrame = [s: string, p: string ... 1 more field]

scala> (df.select($"s", struct($"p", $"o").as("po"))
     |   .groupBy("s")
     |   .agg(collect_list($"po").as("polist"))
     |   .as[(String, Array[(String, String)])]
     |   .flatMap(r => {
     |     val ps = r._2.map(_._1).toSet
     |           if(ps("P625") && ps("P36")) {
     |             r._2.flatMap(po => Some(r._1, po._1, po._2))
     |           } else {
     |             None
     |           }
     |   }).toDF("s", "p", "o")
     |   .show())
+---+----+----+                                                                 
|  s|   p|   o|
+---+----+----+
|Q31| P36|Q239|
|Q31|P625|  51|
|Q45| P36|Q597|
|Q45|P625| 123|
+---+----+----+

Для справки, вышеприведенная команда join() вернула бы:

scala> df.filter($"p" === "P625").join(df.filter($"p" === "P36"), "s").show
+---+----+---+---+----+
|  s|   p|  o|  p|   o|
+---+----+---+---+----+
|Q31|P625| 51|P36|Q239|
|Q45|P625|123|P36|Q597|
+---+----+---+---+----+

Что может быть использовано и в вашем окончательном решении, возможно, с меньшим количеством кода, но я не уверен, какой метод будет более эффективным, поскольку он в значительной степени зависит от данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...