Подумав о масштабировании и тому подобном - но неясно, достаточно ли хорош Catalyst, - я предлагаю решение, основанное на одном из ответов, которое может выиграть от разбиения, и для которого гораздо меньше работы - просто думая о данных.Речь идет о предварительных вычислениях и обработке, о том, что некоторые виды массажа могут превзойти методы грубой силы.Ваша точка зрения на JOIN менее важна, так как сейчас это ограниченное JOIN, а не массовая генерация данных.
Ваш комментарий к подходу с фреймами данных немного искажен тем, что все, что здесь превзошло, - это фреймы данных.Я думаю, что вы имеете в виду, что вы хотите перебрать фрейм данных и получить суб-цикл с выходом.Я не могу найти такого примера, и на самом деле я не уверен, что он соответствует парадигме SPARK.Получены те же результаты, но с меньшей обработкой:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
также возвращает:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+