Мне нужно перебрать кадр данных в определенном порядке и применить некоторую сложную логику для вычисления нового столбца.
Кроме того, я настоятельно рекомендую делать это в общем виде, чтобы мне не приходилось перечислять все столбцы строки и делали df.as[my_record]
или case Row(...) =>
, как показано здесь . Вместо этого я хочу получить доступ к столбцам строк по их именам и просто добавить результирующие столбцы в исходную строку.
Приведенный ниже подход работает просто отлично, но я бы хотел не указывать схему дважды: первый раз, чтобы я мог получить доступ к столбцам по имени при итерации, и второй раз для обработки вывода.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
Какая информация теряется после применения mapPartitions
, поэтому вывод не может быть обработан без явного кодировщика? Как не указывать это?