Вот одно решение, использующее функции df и window
scala> val df = Seq((1,1,0,"a"),(2,1,1,"b"),(3,2,0,"c"),(4,2,0,"d"),(5,2,1,"e"),(6,3,1,"f"),(7,3,0,"g"),(8,4,1,"h")).toDF("id","group","pick","name")
df: org.apache.spark.sql.DataFrame = [id: int, group: int ... 2 more fields]
scala> val df2=df.filter('pick===1).withColumnRenamed("pick","pick2").withColumnRenamed("name","name2")
df2: org.apache.spark.sql.DataFrame = [id: int, group: int ... 2 more fields]
scala> df.join(df2,Seq("id","group"),"leftOuter").withColumn("picked_name",max('name2).over(Window.partitionBy('group))).drop("pick2","name2").show
+---+-----+----+----+-----------+
| id|group|pick|name|picked_name|
+---+-----+----+----+-----------+
| 1| 1| 0| a| b|
| 2| 1| 1| b| b|
| 6| 3| 1| f| f|
| 7| 3| 0| g| f|
| 8| 4| 1| h| h|
| 3| 2| 0| c| e|
| 4| 2| 0| d| e|
| 5| 2| 1| e| e|
+---+-----+----+----+-----------+
scala>