Обычным способом решения вашей проблемы будет индексирование информационного кадра и фильтрация индексов, которые больше 2.
Простой подход:
Как предлагается вдругой ответ, вы можете попробовать добавить индекс с помощью monotonically_increasing_id
.
df.withColumn("Index",monotonically_increasing_id)
.filter('Index > 2)
.drop("Index")
. Тем не менее, это будет работать, только если первые 3 строки находятся в первом разделе.Более того, как уже упоминалось в комментариях, сегодня это так, но этот код может полностью сломаться при появлении новых версий или появиться, и это будет очень сложно отладить.Действительно, контракт в API - это просто «Сгенерированный идентификатор гарантированно будет монотонно увеличивающимся и уникальным, но не последовательным».Поэтому не очень мудро предполагать, что они всегда будут начинаться с нуля.В текущей версии могут быть даже другие случаи, когда это не работает (хотя я не уверен).
Чтобы проиллюстрировать мою первую проблему, взгляните на это:
scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
+---+----------+
| id| Index|
+---+----------+
| 0| 0|
| 1| 1|
| 2|8589934592|
| 3|8589934593|
+---+----------+
Мы удалили бы только две строки ...
Безопасный подход:
Предыдущий подход будет работать большую часть времени, хотя, чтобы быть безопасным, вы можете использоватьzipWithIndex
из API RDD для получения последовательных индексов.
def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
val rdd = df.rdd.zipWithIndex
.map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
val newSchema = df.schema
.add(StructField(name, LongType, false))
df.sparkSession.createDataFrame(rdd, newSchema)
}
zipWithIndex(df, "index").where('index > 2).drop("index")
Мы можем проверить, что это безопаснее:
scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
+---+-----+
| id|index|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
+---+-----+