Spark Dataframe, обеспечивающий различные уровни точности для поплавков - PullRequest
0 голосов
/ 06 февраля 2020

Когда мы создаем искровой фрейм данных, мы отправляем данные в фрейме данных в Куду и Кафку (который, в свою очередь, подбирается и переходит в S3)

Однако точность от того, что мы видим в Куду однажды датафрейм выгружен 1e-15 Kudu Precision

Теперь, если я использую тот же самый фрейм данных и преобразую его в формат, который Kafka может использовать

override def getKafkaDataFrame(df: DataFrame) : DataFrame = {
df.withColumn("key", to_json(struct(df.col(PK1),
  df.col(PK2)))
  .withColumn("value", to_json(struct(df.columns.map(col): _*)))

, который в Ход (в отдельном приложении) отправляет на S3

Получается, как это для той же строки, которая 1e-6

S3 Precision

У нас также есть метод перехода к Kudu на S3 напрямую (это не поток), и эта точность соответствует Kudu, когда inferSchema имеет значение true, а затем простой dataframe.write

Мне интересно, где эта потеря От чего зависит точность / масштаб, и как мне go исправить это.

Что было предпринято: преобразование всех полей в строковые, преобразование только полей с плавающей запятой в удвоенные числа

Спасибо

Ответы [ 2 ]

2 голосов
/ 07 февраля 2020

Не уверен, какой источник данных использовался для фрейма данных, но, скорее всего, проблема вызвана схемой и типом данных для столбца с проблемой точности. FloatType недостаточно для обработки точности 1e-15, поскольку он представляет 4-байтовые числа с плавающей запятой одинарной точности.

Маленький пример

object DecisionPlays {
  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types.FloatType

  case class DoubleColumn(doubleValue:Double)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val temp = DoubleColumn(1.12345678910111213)
    val df = spark.createDataFrame(Seq(temp))
    //[1.1234567891011122,{"doubleValue":1.1234567891011122}]
    df.withColumn("value", to_json(struct(df.columns.map(col): _*))).collect().foreach(println)
    //[1.1234568,{"doubleValue":1.1234568}]
    df.select(col("doubleValue").cast(FloatType))
      .withColumn("value", to_json(struct(df.columns.map(col): _*))).collect().foreach(println)

  }
}
0 голосов
/ 07 февраля 2020

Взяв то, что Андрей сказал о искровых поплавках

Я решил создать одно поле, которое было поплавком, и заменить его на двойное везде в коде

И это сработало!

Куду уже меняет все числа с плавающей запятой на двойные, поэтому никаких изменений данных там не произошло, но теперь данные верны и в s3

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...