Предыдущий ответ, который я разместил, был правильным, но менее распространенным. Из .explain () мы могли наблюдать Exchange SinglePartition
.
Это решение работает параллельно - в физическом плане не найдено ни единого раздела. В масштабе это выглядит хорошо, но .explain становится трудно читать. Это сложный приказ, если вы вообще не хотите об этом думать, так что, возможно, не всем.
Из-за очевидных преобладающих обстоятельств я мог справиться с этим. Это также мое последнее усилие здесь по Data Wrangling, поскольку оно не стоит потенциальных 15 + 10 баллов. Тем не менее, в моем другом ответе также говорится, что подача должна быть улучшена, что будет моим профессиональным советом. Следовательно, это решение, основанное на свободном времени, но оно выделяет несколько моментов и недостатков. Предполагается, что разбиение диапазона является надежным, проверив с помощью .glom, что это действительно так.
Слишком сложно объяснить, посмотреть комментарии и выводы.
Код запущен на ноутбуке:
// May be academic. If low volume of data then my other solution can suffice, this is a distributed+ solution.
// Need 2 repartitions, interesting concept that turns out to be true.
// Runs in parallel but the .explain hard to interprete.
// By extracting twice carefully from parent RDD, hoping that partition awareness is retained and that whole partiton moved to other executor with other partition so as to avoid hashing.
// Not certain if range Partitioning needs to check the ranges.
// Some educational coding aspects left in.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.functions.{lead, lag}
import spark.implicits._
// Gen example data via DF, can come from files, ordering in those files assumed. I.e. no need to sort.
val df = Seq(
("1 February"), ("n"), ("c"), ("b"),
("2 February"), ("hh"), ("www"), ("e"),
("3 February"), ("y"), ("s"), ("j"),
("1 March"), ("c"), ("b"), ("x"),
("1 March"), ("c"), ("b"), ("x"),
("2 March"), ("c"), ("b"), ("x"),
("3 March"), ("c"), ("b"), ("x"), ("y"), ("z")
).toDF("line")
// Define Case Classes to avoid Row aspects on df --> rdd --> to DF which I always must look up again.
case class X(line: String)
case class Xtra(key: Long, line: String)
// Add the Seq Num using zipWithIndex.
val rdd = df.as[X].rdd.zipWithIndex().map{case (v,k) => (k,v)}
val ds = rdd.toDF("key", "line").as[Xtra]
// An idea that was not totally utilized but left in.
val m = Map("january" -> 1,"february" -> 2,"march" -> 3, "april" -> 4)
val dfMth = m.toSeq.toDF("monthM", "monthNum")
// Num partitions for processing.
// Apply Range Partititioning to allow subsequent distributed processing as opposed to Exchange SinglePartition.
val n = 5 // Can be altered, set here. This value drives parallel processing.
sqlContext.setConf("spark.sql.shuffle.partitions", "5")
// Now records the partition the data is in. The issues is that there is no actual partitioning key and that the data of relevance can be in the next partition
// as well (at most one is the assumption made!), but we wish to have partitioned processing.
val rdd2 = ds.repartitionByRange(n, $"key")
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
})
val df2 = rdd2.toDF("part", "line_data").cache
// Separate data into separate fields.
val df3 = df2.withColumn("_tmp", split($"line_data.line", " "))
.select($"part", $"line_data.key".as("key"),
$"_tmp".getItem(0).as("val"),
$"_tmp".getItem(1).as("month")
)
.drop("_tmp")
// Data with partitioning and seq numbering and some tidy up of fields.
val df4 = df3.withColumn("val", when(col("month").isNull, expr("substring(val,2,length(val)-2)")).otherwise(expr("substring(val,2,length(val))")))
.withColumn("month", expr("substring(month,1,length(month)-1)"))
val df44 = df4.withColumn("partX", $"part")
// Get first N rows per partition, N = 4 here, just an example.
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(df4.schema)
val df100 = df4.mapPartitions(iterator => iterator.take(4))
val df100U = df100.withColumn("partX", $"part" - 1)
// Add 1st N rows of next partition to prior partition, except for last partition. Can be set higher if you think more data from previous entry spills over.
val dfC = df44.union(df100U)
// Repartition so that parallellization processing gives correct answer. Assuming Catalyst is partition aware.
// Any other way means lose peformance. This is the parent RDD so we hope that Catalyst is smart enoygh.
val dfD = dfC.repartitionByRange(n, $"partX").cache() // Used twice.
// Get those values with a month entry that are the break points and get the adjacent values for comparison in next step.
val df55 = dfD.join(broadcast(dfMth), lower(dfD.col("month")) === dfMth.col("monthM")).drop("monthM").drop("monthNum")
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("partX").orderBy("key")
val df555 = df55.withColumn("idxNextDate", (lead("key", 1, 999999999999L).over(w)-1)).toDF("part1", "key1", "val1", "month1", "partX", "idxNextDate").cache()
// LKP table to be compared against with same range partition.
// Get the 'value' entries, not the month, date aspects.
val df66 = dfD.filter( $"month".isNull && $"partX" > -1)
// Make the calculation - in parallel, how well is not clear from DAG.
val dfZ = df66.join( df555, (df66("partX") === df555("partX"))
&& ($"key" geq $"key1") && ($"key" <= $"idxNextDate")
&& ($"part1" === df555("partX"))
&& (df555("partX") =!= -1) )
.select($"val1".alias("date"), $"month1".alias("month"), $"val".alias("lines"), $"key", df555("partX").alias("PX")).orderBy("PX", "key")//.explain()
dfZ.show(100)
Как всегда, для обеспечения безопасности при параллельной обработке необходимо выполнить сортировку, что видно из результатов - здесь не показано. Но это зависит от варианта использования. Процесс сортировки не может увидеть, что данные на самом деле уже отсортированы из-за перераспределения и отрисовки из того же родительского RDD и узких преобразований. Я все это проверил.
== Physical Plan ==
Sort [PX#2022 ASC NULLS FIRST, key#1758L ASC NULLS FIRST], true, 0
...
возвращает:
+----+--------+-----+---+---+
|date| month|lines|key| PX|
+----+--------+-----+---+---+
| 1|February| n| 1| 0|
| 1|February| c| 2| 0|
| 1|February| b| 3| 0|
| 2|February| hh| 5| 0|
| 2|February| www| 6| 0|
| 2|February| e| 7| 0|
| 3|February| y| 9| 1|
| 3|February| s| 10| 1|
| 3|February| j| 11| 1|
| 1| March| c| 13| 2|
| 1| March| b| 14| 2|
| 1| March| x| 15| 2|
| 1| March| c| 17| 2|
| 1| March| b| 18| 2|
| 1| March| x| 19| 2|
| 2| March| c| 21| 3|
| 2| March| b| 22| 3|
| 2| March| x| 23| 3|
| 3| March| c| 25| 4|
| 3| March| b| 26| 4|
| 3| March| x| 27| 4|
| 3| March| y| 28| 4|
| 3| March| z| 29| 4|
+----+--------+-----+---+---+
Вам не нужна сортировка, но вопрос немного неясен, если сортировка на самом деле требуется. Я думаю, улучшить подачу.