По просьбе ОП я записываю ответ, который написал под комментариями.
Цель рассматриваемой проблемы состоит в том, чтобы отфильтровать DataFrame
, где каждый конкретный ID
имеет только элемент Type
A
и ни один из других.
# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first
Идея состоит в том, чтобы aggregate()
DataFrame
сначала ID
, в результате чего мы группируем все unique
элементы Type
, используя collect_set()
в массив. Важно иметь unique
элементов, потому что может случиться так, что для определенного ID
может быть две строки, причем обе строки имеют Type
как A
. Вот почему мы должны использовать collect_set()
, а не collect_list()
, потому что последний не будет возвращать уникальные элементы, а скорее все элементы.
Затем мы должны использовать first()
, чтобы получить первое значение Type
и Value
в группе. Если A
является единственным unique
Type
, возможным для определенного ID
, то first()
вернет единственное значение A
, если A
встречается один раз, и верхнее значение, если есть дубликаты A
.
df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
first(col('Value')).alias('Value'),
collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2| A| 12| [A]|
|ID3| A| 3|[A, B, C]|
|ID1| A| 1| [A, B]|
|ID4| A| 10| [A]|
+---+----+-----+---------+
Наконец, мы поставим 2 условия одновременно, чтобы отфильтровать требуемый набор данных.
Условие 1: Проверяется наличие A
в массиве Type
с использованием array_contains()
.
Условие 2: Проверяет size
массива. Если размер больше 1, то должно быть кратно Types
.
df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2| A| 12|
|ID4| A| 10|
+---+----+-----+