Почему Spark groupBy.agg (min / max) BigDecimal всегда возвращает 0? - PullRequest
0 голосов
/ 12 февраля 2019

Я пытаюсь сгруппировать по одному столбцу DataFrame и генерировать значения min и max столбца BigDecimal в каждой из полученных групп.Результаты всегда дают очень маленькое (приблизительно 0) значение.

(Подобные вызовы min/max для столбца Double приводят к ожидаемым ненулевым значениям.)

В качестве простого примера:

Если я создаю следующий DataFrame:

import org.apache.spark.sql.{functions => f}

case class Foo(group: String, bd_value: BigDecimal, d_value: Double)

val rdd = spark.sparkContext.parallelize(Seq(
  Foo("A", BigDecimal("1.0"), 1.0),
  Foo("B", BigDecimal("10.0"), 10.0),
  Foo("B", BigDecimal("1.0"), 1.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0)
))

val df = rdd.toDF()

Выбор max для столбца Double или BigDecimal возвращает ожидаемый результат:

df.select(f.max("d_value")).show()

// +------------+
// |max(d_value)|
// +------------+
// |        10.0|
// +------------+

df.select(f.max("bd_value")).show()

// +--------------------+
// |       max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+

Но если яПри группировании и агрегировании я получаю разумный результат для столбца Double, но близкие к нулю значения для столбца BigDecimal:

df.groupBy("group").agg(f.max("d_value")).show()

// +-----+------------+
// |group|max(d_value)|
// +-----+------------+
// |    B|        10.0|
// |    C|        10.0|
// |    A|         1.0|
// +-----+------------+

df.groupBy("group").agg(f.max("bd_value")).show()

// +-----+-------------+
// |group|max(bd_value)|
// +-----+-------------+
// |    B|     1.00E-16|
// |    C|     1.00E-16|
// |    A|      1.0E-17|
// +-----+-------------+

Почему искра возвращает нулевой результат для этих вызовов min/max?

1 Ответ

0 голосов
/ 14 февраля 2019

TL; DR

Кажется, существует несоответствие в том, как Spark относится к шкале BigDecimals, которая проявляется в конкретном случае, показанном в вопросе.Код ведет себя так, как будто он преобразует BigDecimal s в немасштабированный Long s, используя масштаб объекта BigDecimal, но затем преобразует обратно в BigDecimal, используя масштаб схемы.

Эту проблему можно обойти, либо

  • , явно установив масштаб на все значения BigDecimal, чтобы соответствовать схеме DataFrame, используя setScale, либо
  • вручнуюуказание схемы и создание DF из RDD [Row]

Длинная версия

Вот что я думаю происходит на моей машине с Spark 2.4.0.

В случае groupBy.max Spark выполняет UnsafeRow и преобразует BigDecimal в немасштабированный Long и сохраняет его как массив байтов в setDecimal в эта строка (проверяется с помощью операторов печати).Затем, когда он позже вызывает getDecimal , он преобразует байтовый массив обратно в BigDecimal с использованием масштаба , указанного в схеме .

Если масштаб в исходном значении не соответствует масштабу в схеме, это приводит к неверному значению.Например,

val foo = BigDecimal(123456)
foo.scale
0

val bytes = foo.underlying().unscaledValue().toByteArray()

// convert the bytes into BigDecimal using the original scale -- correct value
val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
sameValue: scala.math.BigDecimal = 123456

// convert the bytes into BigDecimal using scale 18 -- wrong value
val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
smaller: scala.math.BigDecimal = 1.23456E-13

Если я просто выберу максимум из столбца bd_value, Spark, похоже, не пройдет setDecimal.Я не проверял, почему или куда он идет.

Но это объяснило бы значения, наблюдаемые в вопросе.Использование того же класса наблюдений Foo:

// This BigDecimal has scale 0
val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))

// And shows with scale 0 in the DF
rdd.toDF.show
+-----+--------+--------+
|group|bd_value| d_value|
+-----+--------+--------+
|    C|  123456|123456.0|
+-----+--------+--------+

// But the schema has scale 18
rdd.toDF.printSchema
root
 |-- group: string (nullable = true)
 |-- bd_value: decimal(38,18) (nullable = true)
 |-- d_value: double (nullable = false)


// groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
rdd.groupBy("group").max("bd_value").show
+-----+-------------+
|group|max(bd_value)|
+-----+-------------+
|    C|  1.23456E-13|
+-----+-------------+

// This BigDecimal is forced to have the same scale as the inferred schema
val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))

// verified the scale is 18 in the DF
+-----+--------------------+--------+
|group|            bd_value| d_value|
+-----+--------------------+--------+
|    C|123456.0000000000...|123456.0|
+-----+--------------------+--------+


// And it works as expected
rdd1.groupBy("group").max("bd_value").show

+-----+--------------------+
|group|       max(bd_value)|
+-----+--------------------+
|    C|123456.0000000000...|
+-----+--------------------+

Это также объясняет, почему, как отмечено в комментарии, он прекрасно работает при преобразовании из RDD [Row] с явной схемой.

val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))

// schema has BigDecimal scale 18
val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))

// createDataFrame interprets the value into the schema's scale
val df = spark.createDataFrame(rdd2, schema)

df.show

+-----+--------------------+--------+
|group|            bd_value| d_value|
+-----+--------------------+--------+
|    C|123456.0000000000...|123456.0|
+-----+--------------------+--------+
...