Избегайте указания схемы дважды (Spark / scala) - PullRequest
0 голосов
/ 08 ноября 2018

Мне нужно перебрать кадр данных в определенном порядке и применить некоторую сложную логику для вычисления нового столбца.

Кроме того, я настоятельно рекомендую делать это в общем виде, чтобы мне не приходилось перечислять все столбцы строки и делали df.as[my_record] или case Row(...) =>, как показано здесь . Вместо этого я хочу получить доступ к столбцам строк по их именам и просто добавить результирующие столбцы в исходную строку.

Приведенный ниже подход работает просто отлично, но я бы хотел не указывать схему дважды: первый раз, чтобы я мог получить доступ к столбцам по имени при итерации, и второй раз для обработки вывода.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  if (iter.hasNext) {
    def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

    val head = iter.next
    val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
    val r =
      new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
    )
  } else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show

Какая информация теряется после применения mapPartitions, поэтому вывод не может быть обработан без явного кодировщика? Как не указывать это?

Ответы [ 3 ]

0 голосов
/ 08 ноября 2018

ОК. Я проверил некоторые из моих искровых кодов и использование .mapPartitions с API набора данных не требует от меня явной сборки / передачи кодировщика.

Вам нужно что-то вроде:

case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)

import spark.implicits._

// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]

def f_row(it: Iterator[Before]): Iterator[After] = ???

beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
0 голосов
/ 15 ноября 2018

Я нашел ниже объяснение достаточно, может быть, это будет полезно для других.

mapPartitions требует Encoder, поскольку в противном случае он не может создать Dataset из итератора или Row s. Несмотря на то, что каждая строка имеет схему, эта схема не может быть получена (использована) конструктором Dataset[U].

  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  }

С другой стороны, без вызова mapPartitions Spark может использовать схему, полученную из исходного запроса, поскольку структура (метаданные) исходных столбцов не изменилась.

Я описал альтернативы в этом ответе: https://stackoverflow.com/a/53177628/7869491.

0 голосов
/ 08 ноября 2018

Какая информация теряется после применения mapPartitions, поэтому вывод не может быть обработан без

Информация почти не теряется - ее не было с самого начала - подклассы Row или InternalRow в основном являются нетипизированными контейнерами переменной формы, которые не предоставляют никакой полезной информации о типе, которую можно использовать для вывести Encoder.

schema в GenericRowWithSchema несущественно, поскольку описывает контент в терминах метаданных, а не типов.

Как не указывать это?

Извините, вам не повезло. Если вы хотите использовать конструкции с динамической типизацией (сумка Any) на языке статической типизации, вы должны заплатить цену, которая здесь дает Encoder.

...