Вот решение.Сначала мы выясняем для каждого теста, имеет ли он значение Y.
import pyspark.sql.functions as sf
by_test = df.groupBy('tests').agg(sf.sum((sf.col('val') == 'Y').cast('int')).alias('HasY'))
by_test.show()
+-----+----+
|tests|HasY|
+-----+----+
|test4| 1|
|test3| 0|
|test1| 1|
|test2| 1|
+-----+----+
Присоединиться к исходному фрейму данных
df = df.join(by_test, on='tests')
df.show()
+-----+---+---+----+
|tests|val|asd|HasY|
+-----+---+---+----+
|test4| Y| 5| 1|
|test3| N| 4| 0|
|test1| Y| 1| 1|
|test1| N| 2| 1|
|test1| N| 3| 1|
|test2| N| 2| 1|
|test2| Y| 1| 1|
+-----+---+---+----+
Создать новый столбец с тем же именем, используя когда / в противном случае
df = df.withColumn('val', sf.when(sf.col('HasY') > 0, 'Y').otherwise(sf.col('val')))
df = df.drop('HasY')
df.show()
+-----+---+---+
|tests|val|asd|
+-----+---+---+
|test4| Y| 5|
|test3| N| 4|
|test1| Y| 1|
|test1| Y| 2|
|test1| Y| 3|
|test2| Y| 2|
|test2| Y| 1|
+-----+---+---+