UnsupportedOperationException: Unimplemented Тип: DoubleType - PullRequest
0 голосов
/ 08 июня 2018

Я пытаюсь записать pyspark df в Snowflake, используя функцию, которую я написал:

def s3_to_snowflake(schema, table):

    df = get_dataframe(schema, table, sqlContext)

    username = user
    password = passw
    account = acct

    snowflake_options = {
        "sfURL" : account+".us-east-1.snowflakecomputing.com",
        "sfAccount" : account,
        "sfUser" : username,
        "sfPassword" : password,
        "sfDatabase" : "database",
        "sfSchema" : schema,
        "sfWarehouse" : "demo_wh"
    }

    sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "KeyId")
    sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", 
"AccessKey")

    (
      df
      .write
      .format("net.snowflake.spark.snowflake")
      .mode("overwrite")
      .options(**snowflake_options)
      .option("dbtable", table)
      .option('tempDir', 's3://data-temp-loads/snowflake')
      .save()
    )

    print('Wrote {0} to {1}.'.format(table, schema))

Эта функция работала для всех таблиц, кроме одной, которые у меня есть в моем хранилище данных.Это схема таблицы, которую я пытаюсь написать.

root
|-- credit_transaction_id: string (nullable = true)
|-- credit_deduction_amt: double (nullable = true)
|-- credit_adjustment_time: timestamp (nullable = true)

Появляющаяся ошибка выглядит так, как будто Снежинка имеет проблемы с этим столбцом DoubleType.У меня раньше была эта проблема с Hive при использовании типов файлов Avro / ORC.Обычно это вопрос приведения одного типа данных к другому.

То, что я пробовал:

  • Кастинг (Double to Float, Double to String, Double to Numeric - последний за каждый Snowflake docs )
  • Перезапуск DDL входящей таблицы с попытками типов Float, String и Numeric

Еще одна вещь, на которую следует обратить внимание: некоторые таблицыу меня успешно перенесены столбцы DoubleType.Не уверен, что проблема с этой таблицей.

1 Ответ

0 голосов
/ 10 июня 2018

После поиска в Интернете, мне кажется, что Spark's Parquet reader генерирует эту ошибку:

https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Ваши файлы определяют df Parquet?Я думаю, что это может быть ошибка чтения вместо ошибки записи;возможно, стоит взглянуть на то, что происходит в get_dataframe.

Спасибо, etduwx

...