Spark DataFrame Cast Column для совместимости с Kudu - PullRequest
0 голосов
/ 15 мая 2019

(Я новичок в Spark, Impala и Kudu.) Я пытаюсь скопировать таблицу из БД Oracle в таблицу Impala, имеющую такую ​​же структуру, в Spark, через Kudu. Я получаю сообщение об ошибке, когда код пытается сопоставить Oracle NUMBER с типом данных Kudu. Как я могу изменить тип данных Spark DataFrame, чтобы сделать его совместимым с Kudu?

Предполагается, что это будет копия 1: 1 из Oracle в Impala. Я извлек схему Oracle из исходной таблицы и создал целевую таблицу Impala с той же структурой (те же имена столбцов и разумное отображение типов данных). Я надеялся, что Spark + Kudu отобразит все автоматически и просто скопирует данные. Вместо этого Куду жалуется, что не может отобразить DecimalType(38,0).

Я хотел бы указать, что «столбец # 1 с именем SOME_COL, который в Oracle является NUMBER, должен быть сопоставлен с LongType, который поддерживается в Kudu».

Как я могу это сделать?

// This works
val df: DataFrame = spark.read
  .option("fetchsize", 10000)
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)

// This does not work  
kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")
// Error: No support for Spark SQL type DecimalType(38,0)
// See https://github.com/cloudera/kudu/blob/master/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala

// So let's see the Spark data types
df.dtypes.foreach{case (colName, colType) => println(s"$colName: $colType")}
// Spark  data type: SOME_COL DecimalType(38,0)
// Oracle data type: SOME_COL NUMBER -- no precision specifier; values are int/long
// Kudu   data type: SOME_COL BIGINT

1 Ответ

1 голос
/ 16 мая 2019

Очевидно, мы можем указать пользовательскую схему при чтении из источника данных JDBC.

connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Это сработало.Я смог указать customSchema, например, так:

col1 Long, col2 Timestamp, col3 Double, col4 String

, и с этим код работает:

import spark.implicits._
val df: Dataset[case_class_for_table] = spark.read
  .option("fetchsize", 10000)
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)
  .as[case_class_for_table]
kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")
...