данные искры изменены после использования метода random в фильтре - PullRequest
1 голос
/ 16 апреля 2020
    val list = List("A", "B", "B", "B", "C", "C", "C", "C", "C")
    val df1 = sc.parallelize(list).toDF("key").withColumn("feature", lit("u4")).select("feature", "key").groupBy(col("feature"), col("key")).count()
    val df2 = sc.parallelize(list).toDF("key").withColumn("feature", lit("u5")).select("feature", "key").groupBy(col("feature"), col("key")).count()
    val df = df1.unionAll(df2)
    df.show

    val udf_sample = udf { (x: String) => {
      Math.random() < 0.3
    }
    }
    df.filter(udf_sample(col("feature"))).show()

вывод:

+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  A|    1|
|     u4|  B|    3|
|     u4|  C|    5|
|     u5|  A|    1|
|     u5|  B|    3|
|     u5|  C|    5|
+-------+---+-----+

+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  A|    1|
|     u4|  C|    2|
|     u5|  A|    1|
|     u5|  B|    1|
|     u5|  C|    2|
+-------+---+-----+

обратите внимание, что во втором кадре данных значения столбца подсчета отличаются от значений в первом кадре данных. Пока я ожидаю случайной фильтрации некоторых строк.

1 Ответ

0 голосов
/ 16 апреля 2020

Фильтрация функции перед группировкой по. Spark ленив, поэтому преобразование (фильтр, группа) не выполняется, пока не будет найдено действие (печать, счет). Есть хорошая статья об этом здесь

Когда действие найдено, Spark пытается оптимизировать применяемые преобразования, поэтому сначала он запускает фильтры, пытаясь уменьшить количество данных. он должен обработать:

Хотя кеш не является действием, он тоже добьется цели, так как при обнаружении действия он будет вычислять DAG до кеша, а затем использовать результат в дальнейших вычислениях

Вы можете кэшировать df, чтобы фильтр выполнялся после groupBy

INPUT

df.show
+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  C|    5|
|     u4|  A|    1|
|     u4|  B|    3|
|     u5|  C|    5|
|     u5|  A|    1|
|     u5|  B|    3|
+-------+---+-----+

ОПРЕДЕЛЕНИЕ UDF

scala> val udf_sample = udf { (x: String) => {
     |       Math.random() < 0.3
     |     }
     |     }
udf_sample: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))

OUPUT

df.cache

df.filter(udf_sample(col("feature"))).show()
+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  C|    5|
|     u4|  B|    3|
|     u5|  C|    5|
+-------+---+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...