Эффективный способ сбора HashSet во время работы с картой на некотором наборе данных - PullRequest
0 голосов
/ 14 мая 2019

У меня большой набор данных для преобразования одной структуры в другую. Во время этой фазы я также хочу собрать некоторую информацию о вычисляемом поле (квадрики для заданных значений lat / long). Я не хочу прикреплять эту информацию к каждой строке результата, так как это дало бы много информации дублирования и накладных расходов памяти. Все, что мне нужно, это знать, к каким конкретным квадкейкам относятся заданные координаты. Если есть какой-то способ сделать это за одно задание, чтобы не повторять набор данных дважды?

def load(paths: Seq[String]): (Dataset[ResultStruct], Dataset[String]) = {
    val df = sparkSession.sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
      .schema(schema)
      .option("delimiter", "\t")
      .load(paths:_*)
      .as[InitialStruct]
    val qkSet = mutable.HashSet.empty[String]

    val result = df.map(c => {
      val id = c.id
      val points = toPoints(c.geom)
      points.foreach(p => qkSet.add(Quadkey.get(p.lat, p.lon, 6).getId))
      createResultStruct(id, points)
    })
    return result, //some dataset created from qkSet's from all executors
}

1 Ответ

1 голос
/ 14 мая 2019

Вы можете использовать аккумуляторы

class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
  import scala.collection.JavaConverters._

  private val items = new ConcurrentHashMap[T, Boolean]

  override def isZero: Boolean = items.isEmpty
  override def copy(): AccumulatorV2[T, Set[T]] = {
    val other = new SetAccumulator[T]
    other.items.putAll(items)
    other
  }
  override def reset(): Unit = items.clear()
  override def add(v: T): Unit = items.put(v, true)
  override def merge(
        other: AccumulatorV2[T, Set[T]]): Unit = other match {
    case setAccumulator: SetAccumulator[T] => items.putAll(setAccumulator.items)
  }
  override def value: Set[T] = items.keys().asScala.toSet
}

val df = Seq("foo", "bar", "foo", "foo").toDF("test")

val acc = new SetAccumulator[String]
spark.sparkContext.register(acc)

df.map {
  case Row(str: String) =>
    acc.add(str)
    str
}.count()

println(acc.value)

Отпечатки

Set(bar, foo)

Обратите внимание, что map сам по себе ленив, поэтому для фактического форсирования вычислений требуется что-то вроде count и т. Д.В зависимости от реального варианта использования, другим вариантом будет кэширование фрейма данных и использование простых функций SQL df.select("test").distinct()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...