Я добавил еще один элемент (1,5)
для проверки.
from pyspark.sql.functions import count,collect_list, min, col
from pyspark.sql.window import Window
#values = [(1,0),(1,3),(1,4),(2,0),(2,3),(2,3)]
values = [(1,0),(1,3),(1,4),(1,5),(2,0),(2,3),(2,3)]
df = sqlContext.createDataFrame(values,['i','k'])
df.show()
+---+---+
| i| k|
+---+---+
| 1| 0|
| 1| 3|
| 1| 4|
| 1| 5|
| 2| 0|
| 2| 3|
| 2| 3|
+---+---+
#Creating a list of all elements grouped by 'i'
w = Window().partitionBy('i')
df = df.withColumn('count',count(col('i')).over(w)).where(col('count') >= 3).drop('count')\
.withColumn('list',collect_list(col('k')).over(w))
df.show()
+---+---+------------+
| i| k| list|
+---+---+------------+
| 1| 0|[0, 3, 4, 5]|
| 1| 3|[0, 3, 4, 5]|
| 1| 4|[0, 3, 4, 5]|
| 1| 5|[0, 3, 4, 5]|
| 2| 0| [0, 3, 3]|
| 2| 3| [0, 3, 3]|
| 2| 3| [0, 3, 3]|
+---+---+------------+
# Creating a UDF to check if 'k' contains ATLEAST one of these three values (0,3,4), irrespective
# of whether there are other values like 7,8 and so on.
check_for_atleast_0_3_4 = udf(lambda row: set([0,3,4]).issubset(row))
df1 = df.withColumn('bool',check_for_atleast_0_3_4(col('list'))).where(col('bool')==True).drop('bool','list')
df1.show()
+---+---+
| i| k|
+---+---+
| 1| 0|
| 1| 3|
| 1| 4|
| 1| 5|
+---+---+
# Creating a UDF to check if 'k' contains ATLEAST and ONLY one of these three values (0,3,4).
# If 'k' has value 7, then all rows corresponding to that 'i' will be removed.
check_for_atleast_and_only_0_3_4 = udf(lambda row: set(list(set(row))).issubset([0,3,4]) and set([0,3,4]).issubset(row))
df1 = df.withColumn('bool',check_for_atleast_and_only_0_3_4(col('list'))).where(col('bool')==True).drop('bool','list')
df1.show()
+---+---+
| i| k|
+---+---+
+---+---+