фиктивные функции искровых колонн в scala - PullRequest
3 голосов
/ 28 марта 2019

Мой код использует monotonically_increasing_id функция scala

val df = List(("oleg"), ("maxim")).toDF("first_name")
   .withColumn("row_id", monotonically_increasing_id)

Я хочу смоделировать его в моем модульном тесте, чтобы он возвращал целые числа 0, 1, 2, 3, ...

В моем spark-shell он возвращает желаемый результат.

scala> df.show
+----------+------+
|first_name|row_id|
+----------+------+
|      oleg|     0|
|     maxim|     1|
+----------+------+

Но в моих приложениях scala результаты отличаются.

Как я могу имитировать функции столбцов?

Ответы [ 2 ]

2 голосов
/ 28 марта 2019

Дразнить такую ​​функцию, чтобы она производила последовательность, не просто. Действительно, spark - это механизм параллельных вычислений, и поэтому последовательный доступ к данным затруднен.

Вот решение, которое вы можете попробовать.

Давайте определим функцию, которая упаковывает данные в фрейм данных:

    def zip(df : DataFrame, name : String) = {
        df.withColumn(name, monotonically_increasing_id)
    }

Тогда давайте перепишем функцию, которую мы хотим протестировать, используя эту функцию zip по умолчанию:

    def fun(df : DataFrame,
            zipFun : (DataFrame, String) => DataFrame = zip) : DataFrame = {
        zipFun(df, "id_row")
    }
    // let 's see what it does
    fun(spark.range(5).toDF).show()
    +---+----------+
    | id|    id_row|
    +---+----------+
    |  0|         0|
    |  1|         1|
    |  2|8589934592|
    |  3|8589934593|
    |  4|8589934594|
    +---+----------+

Это то же самое, что и раньше, давайте напишем новую функцию, которая использует zipWithIndex из API RDD. Это немного утомительно, потому что мы должны переключаться между двумя API.

    def zip2(df : DataFrame, name : String) = {
        val rdd = df.rdd.zipWithIndex
            .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
        val newSchema = df.schema.add(StructField(name, LongType, false))
        df.sparkSession.createDataFrame(rdd, newSchema)
    }
    fun(spark.range(5).toDF, zip2)
    +---+------+
    | id|id_row|
    +---+------+
    |  0|     0|
    |  1|     1|
    |  2|     2|
    |  3|     3|
    |  4|     4|
    +---+------+

Вы можете адаптировать zip2, например, умножив i на 2, чтобы получить то, что вы хотите.

0 голосов
/ 28 марта 2019

Основываясь на ответе @Oli, я нашел следующий обходной путь:

val df = List(("oleg"), ("maxim")).toDF("first_name")
   .withColumn("row_id", monotonically_increasing_id)
   .withColumn("test_id", row_number().over(Window.orderBy("row_id")))

Это решает мою проблему, но я все еще интересуюсь функциями насмешливого столбца.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...