Этот код ниже прекрасно работает в локальном режиме с правильным выводом, как и ожидалось, но тот же код возвращает значения 0 и ноль для уникального числа, минимального и максимального профилировщика соответственно, когда мы отправляем задание в режиме кластера .
val result = sourceData.schema.map(st => {
val column = st.name.toString
val nullcount = sourceData.filter(sourceData(column).isNull).count()
val uniqueCount = sourceData.select(column).distinct().count()
val df = sourceData
.agg(countDistinct(col(column)).alias("UniqueCount"), min(col(column)).alias("Min"), max(col(column)).alias("Max"))
.withColumn("UniqueCount", lit(uniqueCount))
.withColumn("NullCount", lit(nullcount))
.withColumn("Column_Name", lit(column))
df
}).reduce(_ union _)
result.show()
Использование команды ниже для отправки искрового задания в режиме кластера:
spark-submit --class ey.profiler.sparkProfilerClient --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar --deploy-mode client DataProfiling.jar
Я пытался добиться того же, но не так, как показано ниже, но я также наблюдал такое же поведение код или неверный результат в фрейме данных конечного результата.
val resultantDf = sourceData.columns.foldLeft(initialDF)((df1, column) => df1.union({
val x = sourceData.filter(col(column).isNull).count()
sourceData
.agg(countDistinct(col(column)), min(col(column)), max(col(column)))
.withColumn("Null_Count", lit(x))
.withColumn("Column_Name", lit(column))
}))
resultantDf.show()
Почему и как решить эту проблему?