Дразнить такую функцию, чтобы она производила последовательность, не просто. Действительно, spark - это механизм параллельных вычислений, и поэтому последовательный доступ к данным затруднен.
Вот решение, которое вы можете попробовать.
Давайте определим функцию, которая упаковывает данные в фрейм данных:
def zip(df : DataFrame, name : String) = {
df.withColumn(name, monotonically_increasing_id)
}
Тогда давайте перепишем функцию, которую мы хотим протестировать, используя эту функцию zip по умолчанию:
def fun(df : DataFrame,
zipFun : (DataFrame, String) => DataFrame = zip) : DataFrame = {
zipFun(df, "id_row")
}
// let 's see what it does
fun(spark.range(5).toDF).show()
+---+----------+
| id| id_row|
+---+----------+
| 0| 0|
| 1| 1|
| 2|8589934592|
| 3|8589934593|
| 4|8589934594|
+---+----------+
Это то же самое, что и раньше, давайте напишем новую функцию, которая использует zipWithIndex
из API RDD. Это немного утомительно, потому что мы должны переключаться между двумя API.
def zip2(df : DataFrame, name : String) = {
val rdd = df.rdd.zipWithIndex
.map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
val newSchema = df.schema.add(StructField(name, LongType, false))
df.sparkSession.createDataFrame(rdd, newSchema)
}
fun(spark.range(5).toDF, zip2)
+---+------+
| id|id_row|
+---+------+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
+---+------+
Вы можете адаптировать zip2
, например, умножив i
на 2, чтобы получить то, что вы хотите.