Промывка искры Dataframe на шоу / счет - PullRequest
0 голосов
/ 06 марта 2019

Я пытаюсь распечатать счетчик данных, а затем первые несколько строк, прежде чем окончательно отправить его на дальнейшую обработку.

Странно, но после вызова count() кадр данных становится пустым.

val modifiedDF = funcA(sparkDF)
val deltaDF = modifiedDF.except(sparkDF)
println(deltaDF.count()) // prints 10
println(deltaDF.count())  //prints 0, similar behavior with show  
funcB(deltaDF) //gets null dataframe

Мне удалось проверить то же самое, используя deltaDF.collect.foreach(println) и последующие вызовы count.

Однако, если я не вызову count или show, а просто отправлю его как есть, funcB получит весь DF с 10 строками.

Ожидается ли это?

Определение funcA() и его зависимостей:

def funcA(inputDataframe: DataFrame): DataFrame = {
    val col_name = "colA"
    val modified_df = inputDataframe.withColumn(col_name, customUDF(col(col_name)))
    val modifiedDFRaw = modified_df.limit(10)
    modifiedDFRaw.withColumn("colA", modifiedDFRaw.col("colA").cast("decimal(38,10)"))
}


val customUDF = udf[Option[java.math.BigDecimal], java.math.BigDecimal](myUDF)


def myUDF(sval: java.math.BigDecimal): Option[java.math.BigDecimal] = {
        val strg_name = Option(sval).getOrElse(return None)
        if (change_cnt < 20)  { 
                    change_cnt = change_cnt + 1
                     Some(strg_name.multiply(new java.math.BigDecimal("1000")))
        } else {
            Some(strg_name)
        } 
    }

Ответы [ 2 ]

1 голос
/ 07 марта 2019

Прежде всего, функция, используемая как UserDefinedFunction, должна быть хотя бы идемпотентной, но оптимально чистой.В противном случае результаты просто недетерминированы.Несмотря на то, что в последних версиях есть какой-то аварийный люк (можно подсказать Spark, что функция не должна быть повторно выполнена), это вам не поможет.

Более того, наличие изменяемой стабильной версии (это не совсем такясно, что является источником change_cnt, но он и записан, и прочитан в udf), так как просто не идет - Spark не обеспечивает глобальное изменяемое состояние .

В целомВаш код:

  • Изменяет локальную копию какого-либо объекта.
  • Принимает решение на основе такого объекта.

К сожалению, оба компонента просто не подлежат утилизации.Вам придется вернуться к этапу планирования и переосмыслить свой дизайн.

0 голосов
/ 08 марта 2019

Ваш Dataframe является распределенным набором данных, и попытка сделать count () возвращает непредсказуемые результаты, поскольку count () может отличаться в каждом узле.Прочитайте документацию о СДР ниже.Это применимо и к кадрам данных.

https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#understanding-closures- https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#printing-elements-of-an-rdd

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...