Как создать внешние таблицы из паркетных файлов в s3, используя hive 1.2? - PullRequest
2 голосов
/ 15 мая 2019

Я создал внешнюю таблицу в Qubole (Hive), которая читает паркетные файлы (сжатые: snappy) из s3, но при выполнении SELECT * table_name я получаю нулевые значения для всех столбцов, кроме столбца с разделами .

Я попытался использовать разные значения serialization.format в SERDEPROPERTIES, но я все еще сталкиваюсь с той же проблемой.И при удалении свойства 'serialization.format' = '1' я получаю ERROR: Failed with exception java.io.IOException:Can not read value at 0 in block -1 in file s3://path_to_parquet/.

Я проверил файлы паркета и смог прочитать данные с помощью parquet-tools:

**file_01.snappy.parquet:**
{"col_2":1234,"col_3":ABC}
{"col_2":124,"col_3":FHK}
{"col_2":12515,"col_3":UPO}


**External table stmt:**
CREATE EXTERNAL TABLE parquet_test
(
    col2 int,
    col3 string
)
PARTITIONED BY (col1 date) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
STORED AS PARQUET
LOCATION 's3://path_to_parquet'
TBLPROPERTIES ('parquet.compress'='SNAPPY');

Result:
col_1   col_2   col_3
5/3/19  NULL    NULL
5/4/19  NULL    NULL
5/5/19  NULL    NULL
5/6/19  NULL    NULL

Expected Result:
col_1   col_2   col_3
5/3/19  1234    ABC
5/4/19  124     FHK
5/5/19  12515   UPO
5/6/19  1234    ABC

1 Ответ

0 голосов
/ 19 мая 2019

Написание приведенного ниже ответа при условии, что таблица была создана с использованием Hive, и чтение с использованием Spark (поскольку вопрос помечен apache-spark-sql)

Как создавались данные?

Spark поддерживает чувствительную к регистру схему. Когда мы используем API-интерфейсы датафреймов, можно писать с использованием чувствительной к регистру схемы.
Пример:

scala> case class Employee(iD: Int, NaMe: String )
defined class Employee
scala> val df =spark.range(10).map(x => Employee(x.toInt, s"name$x")).write.save("file:///tmp/data/")
scala> spark.read.parquet("file:///tmp/data/").printSchema
root
 |-- iD: integer (nullable = true)
 |-- NaMe: string (nullable = true)

Обратите внимание, что в приведенном выше примере чувствительность к регистру сохраняется.
Когда мы создадим таблицу Hive поверх данных, созданных из Spark, Hive сможет правильно ее прочитать, поскольку она не чувствительна к регистру.
Принимая во внимание, что когда те же самые данные читаются с использованием Spark, он использует схему из Hive, которая по умолчанию является строчной, и возвращаются строки null.
Чтобы преодолеть это, Spark представил конфигурацию spark.sql.hive.caseSensitiveInferenceMode.

object HiveCaseSensitiveInferenceMode extends Enumeration {
  val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
}

val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
  .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
    "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
    "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
    "any table backed by files containing case-sensitive field names or queries may not return " +
    "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
    "case-sensitive schema from the underlying data files and write it back to the table " +
    "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
    "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
    "instead of inferring).")
  .stringConf
  .transform(_.toUpperCase(Locale.ROOT))
  .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
  .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

INFER_AND_SAVE - Spark выводит схему и сохраняет ее в metastore как часть таблицы TBLEPROPERTIES (desc extended <table name> должен раскрыть это)
Если значение свойства равно NOT или INFER_AND_SAVE или INFER_ONLY, то Spark использует схему из таблицы метастазов и не сможет прочитать файлы паркета. Значение по умолчанию для свойства INFER_AND_SAVE, начиная с Spark 2.2.0.

Мы могли бы проверить следующее, чтобы увидеть, связана ли проблема с чувствительностью схемы:
1. Значение spark.sql.hive.caseSensitiveInferenceMode (spark.sql("set spark.sql.hive.caseSensitiveInferenceMode") должно раскрыть это)
2. Если данные созданы с использованием Spark
3. Если 2 истинно, проверьте, чувствительна ли схема к регистру (spark.read(<location>).printSchema) 4. если 3 использует регистрозависимую схему и вывод из 1 не равен INFER_AND_SAVE / INFER_ONLY, установите spark.sql("set spark.sql.hive.caseSensitiveInferenceMode=INFER_AND_SAVE"), удалите таблицу, заново создайте таблицу и попытайтесь прочитать данные из Spark.

...