Извлечение определенных строк в PySpark - PullRequest
1 голос
/ 09 апреля 2019

У меня есть такой фрейм данных

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

Я хочу извлечь только те строки (или идентификаторы), которые содержат только один конкретный тип - "A"

Следовательно, мой ожидаемый результат будетсодержать следующие строки

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|    1|
|ID4|   A|   10|
+---+----+-----+

Для каждого идентификатора может содержаться любой тип - A, B, C и т. д. Я хочу извлечь те идентификаторы, которые содержат один и только один тип - 'A'

Как я могу добиться этого в PySpark

Ответы [ 3 ]

4 голосов
/ 10 апреля 2019

По просьбе ОП я записываю ответ, который написал под комментариями.

Цель рассматриваемой проблемы состоит в том, чтобы отфильтровать DataFrame, где каждый конкретный ID имеет только элемент Type A и ни один из других.

# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first

Идея состоит в том, чтобы aggregate() DataFrame сначала ID, в результате чего мы группируем все unique элементы Type, используя collect_set() в массив. Важно иметь unique элементов, потому что может случиться так, что для определенного ID может быть две строки, причем обе строки имеют Type как A. Вот почему мы должны использовать collect_set(), а не collect_list(), потому что последний не будет возвращать уникальные элементы, а скорее все элементы.

Затем мы должны использовать first(), чтобы получить первое значение Type и Value в группе. Если A является единственным unique Type, возможным для определенного ID, то first() вернет единственное значение A, если A встречается один раз, и верхнее значение, если есть дубликаты A.

df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
                                 first(col('Value')).alias('Value'),
                                 collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2|   A|   12|      [A]|
|ID3|   A|    3|[A, B, C]|
|ID1|   A|    1|   [A, B]|
|ID4|   A|   10|      [A]|
+---+----+-----+---------+

Наконец, мы поставим 2 условия одновременно, чтобы отфильтровать требуемый набор данных.

Условие 1: Проверяется наличие A в массиве Type с использованием array_contains().

Условие 2: Проверяет size массива. Если размер больше 1, то должно быть кратно Types.

df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+
4 голосов
/ 09 апреля 2019

Вы можете применить к нему фильтр.

import pyspark.sql.functions as f

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

x= df.filter(f.col('Type')=='A')

x.show()

Если нам нужно отфильтровать все идентификаторы, которые имеют только одну запись и тоже с Типом как 'A', то приведенный ниже код может быть решением


df.registerTempTable('table1')


sqlContext.sql('select a.ID, a.Type,a.Value from table1 as a, (select ID, count(*) as cnt_val from table1 group by ID) b where a.ID = b.ID and (a.Type=="A" and b.cnt_val ==1)').show()


+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+


Будут лучшие альтернативные способы найти то же самое.

3 голосов
/ 09 апреля 2019

Я не бегло говорю на Python, вот возможное решение в Scala:

df.groupBy("ID").agg(collect_set("Type").as("Types"))
  .select("ID").where((size($"Types")===1).and(array_contains($"Types", "A"))).show()
+---+
| ID|
+---+
|ID2|
|ID4|
+---+

Идея состоит в группировании по ID и фильтрации только Types размера 1, содержащего значение A.

...