Почему спарк-код прекрасно работает в локальном режиме, но возвращает значения null / 0 в кластерном режиме? - PullRequest
0 голосов
/ 24 февраля 2020

Этот код ниже прекрасно работает в локальном режиме с правильным выводом, как и ожидалось, но тот же код возвращает значения 0 и ноль для уникального числа, минимального и максимального профилировщика соответственно, когда мы отправляем задание в режиме кластера .

 val result = sourceData.schema.map(st => {
  val column = st.name.toString
  val nullcount = sourceData.filter(sourceData(column).isNull).count()
  val uniqueCount = sourceData.select(column).distinct().count()


  val df = sourceData
    .agg(countDistinct(col(column)).alias("UniqueCount"), min(col(column)).alias("Min"), max(col(column)).alias("Max"))
    .withColumn("UniqueCount", lit(uniqueCount))
    .withColumn("NullCount", lit(nullcount))
    .withColumn("Column_Name", lit(column))

  df
}).reduce(_ union _)
result.show()

Использование команды ниже для отправки искрового задания в режиме кластера:

spark-submit --class ey.profiler.sparkProfilerClient --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar --deploy-mode client DataProfiling.jar

Я пытался добиться того же, но не так, как показано ниже, но я также наблюдал такое же поведение код или неверный результат в фрейме данных конечного результата.

val resultantDf = sourceData.columns.foldLeft(initialDF)((df1, column) => df1.union({
      val x = sourceData.filter(col(column).isNull).count()
      sourceData
        .agg(countDistinct(col(column)), min(col(column)), max(col(column)))
        .withColumn("Null_Count", lit(x))
        .withColumn("Column_Name", lit(column))


    }))
    resultantDf.show()  

Почему и как решить эту проблему?

...