Вы можете использовать аккумуляторы
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
import scala.collection.JavaConverters._
private val items = new ConcurrentHashMap[T, Boolean]
override def isZero: Boolean = items.isEmpty
override def copy(): AccumulatorV2[T, Set[T]] = {
val other = new SetAccumulator[T]
other.items.putAll(items)
other
}
override def reset(): Unit = items.clear()
override def add(v: T): Unit = items.put(v, true)
override def merge(
other: AccumulatorV2[T, Set[T]]): Unit = other match {
case setAccumulator: SetAccumulator[T] => items.putAll(setAccumulator.items)
}
override def value: Set[T] = items.keys().asScala.toSet
}
val df = Seq("foo", "bar", "foo", "foo").toDF("test")
val acc = new SetAccumulator[String]
spark.sparkContext.register(acc)
df.map {
case Row(str: String) =>
acc.add(str)
str
}.count()
println(acc.value)
Отпечатки
Set(bar, foo)
Обратите внимание, что map
сам по себе ленив, поэтому для фактического форсирования вычислений требуется что-то вроде count
и т. Д.В зависимости от реального варианта использования, другим вариантом будет кэширование фрейма данных и использование простых функций SQL df.select("test").distinct()