Pyspark, как отфильтровать данные в datafram, которые существуют в списке - PullRequest
0 голосов
/ 16 мая 2019

Я пытаюсь отфильтровать данные в фрейме данных, который существует в другом списке кортежей.

Например, мой существующий_дф содержит следующие данные:

| one_id |  another_id  | val1 | val2 | val3 |
-------------------------------------------------
| 1      |  11          | a1   | b1   | c1   |
| 2      |  12          | a2   | b2   | c2   |
| 3      |  13          | a3   | b3   | c3   |
| 4      |  14          | a4   | b4   | c4   |

list1 = ['1','2']
list2 = ['11','12']

Ожидаемый результат -следующий фрейм данных:

| one_id |  another_id  | val1 | val2 | val3 |
-------------------------------------------------
| 3      |  13          | a3   | b3   | c3   |
| 4      |  14          | a4   | b4   | c4   |

В своем коде я делаю следующее:

keys = some_df.select("one_id", "another_id")\
    .rdd.map(lambda r: (str(r[0]),str(r[1]),)).distinct().collect()
list1, list2 = zip(*keys)


existing_df.filter(
    ~existing_df.one_id.fn.isin(list1) & ~existing_df.another_id.fn.isin(list2)
)

и я и получаю следующее исключение:

{Py4JJavaError}An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [oneId1, oneId2]
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
  at org.apache.spark.sql.catalyst.expressions.Literal$.$anonfun$create$2(literals.scala:164)
  at scala.util.Failure.getOrElse(Try.scala:222)
  at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:164)
  at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
  at org.apache.spark.sql.functions$.lit(functions.scala:110)
  at org.apache.spark.sql.functions.lit(functions.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodIn...

1 Ответ

0 голосов
/ 17 мая 2019

Вы почти рядом.

df = spark.createDataFrame([(1,11,'a1','b1','c1'),(2,12,'a2','b2','c2'),(3,13,'a3','b3','c3'),(4,14,'a4','b4','c4')],['one_id','another_id','val1','val2','val3'])
list1 = ['1','2']
list2 = ['11','12']

Вы можете использовать NOT isin() через этот синтаксис (isin() == False или ~isin())

from pyspark.sql.functions import col
df.where((col('one_id').isin(list1) == False) & (col('another_id').isin(list2) == False)).show()

#+------+----------+----+----+----+
#|one_id|another_id|val1|val2|val3|
#+------+----------+----+----+----+
#|     3|        13|  a3|  b3|  c3|
#|     4|        14|  a4|  b4|  c4|
#+------+----------+----+----+----+
...