TL; DR
Кажется, существует несоответствие в том, как Spark относится к шкале BigDecimals
, которая проявляется в конкретном случае, показанном в вопросе.Код ведет себя так, как будто он преобразует BigDecimal
s в немасштабированный Long
s, используя масштаб объекта BigDecimal
, но затем преобразует обратно в BigDecimal
, используя масштаб схемы.
Эту проблему можно обойти, либо
- , явно установив масштаб на все значения
BigDecimal
, чтобы соответствовать схеме DataFrame, используя setScale
, либо - вручнуюуказание схемы и создание DF из RDD [Row]
Длинная версия
Вот что я думаю происходит на моей машине с Spark 2.4.0.
В случае groupBy.max
Spark выполняет UnsafeRow и преобразует BigDecimal
в немасштабированный Long
и сохраняет его как массив байтов в setDecimal
в эта строка (проверяется с помощью операторов печати).Затем, когда он позже вызывает getDecimal , он преобразует байтовый массив обратно в BigDecimal
с использованием масштаба , указанного в схеме .
Если масштаб в исходном значении не соответствует масштабу в схеме, это приводит к неверному значению.Например,
val foo = BigDecimal(123456)
foo.scale
0
val bytes = foo.underlying().unscaledValue().toByteArray()
// convert the bytes into BigDecimal using the original scale -- correct value
val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
sameValue: scala.math.BigDecimal = 123456
// convert the bytes into BigDecimal using scale 18 -- wrong value
val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
smaller: scala.math.BigDecimal = 1.23456E-13
Если я просто выберу максимум из столбца bd_value
, Spark, похоже, не пройдет setDecimal
.Я не проверял, почему или куда он идет.
Но это объяснило бы значения, наблюдаемые в вопросе.Использование того же класса наблюдений Foo
:
// This BigDecimal has scale 0
val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))
// And shows with scale 0 in the DF
rdd.toDF.show
+-----+--------+--------+
|group|bd_value| d_value|
+-----+--------+--------+
| C| 123456|123456.0|
+-----+--------+--------+
// But the schema has scale 18
rdd.toDF.printSchema
root
|-- group: string (nullable = true)
|-- bd_value: decimal(38,18) (nullable = true)
|-- d_value: double (nullable = false)
// groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
rdd.groupBy("group").max("bd_value").show
+-----+-------------+
|group|max(bd_value)|
+-----+-------------+
| C| 1.23456E-13|
+-----+-------------+
// This BigDecimal is forced to have the same scale as the inferred schema
val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))
// verified the scale is 18 in the DF
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+
// And it works as expected
rdd1.groupBy("group").max("bd_value").show
+-----+--------------------+
|group| max(bd_value)|
+-----+--------------------+
| C|123456.0000000000...|
+-----+--------------------+
Это также объясняет, почему, как отмечено в комментарии, он прекрасно работает при преобразовании из RDD [Row] с явной схемой.
val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))
// schema has BigDecimal scale 18
val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))
// createDataFrame interprets the value into the schema's scale
val df = spark.createDataFrame(rdd2, schema)
df.show
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+