Ошибка при отображении данных из файла CSV в таблицу Hive на HDFS - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь загрузить фрейм данных в таблицу Hive, выполнив следующие действия:

  1. Считайте исходную таблицу и сохраните фрейм данных в виде CSV-файла на HDFS

    val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
    
  2. Упорядочить столбцы согласно моим столбцам таблицы Hive. Мои столбцы таблицы Hive представлены в виде строки в следующем формате:

    val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype
    val schemaList        = hiveCols.split("\\|")
    val hiveColumnOrder   = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq
    val finalDF           = yearDF.selectExpr(hiveColumnOrder:_*)
    

    Порядок столбцов, которые я читаюв «execQuery» такие же, как «hiveColumnOrder», и просто чтобы убедиться в порядке, я еще раз выбираю столбцы в yearDF, используя selectExpr

  3. Сохранение кадра данных в виде файла CSV на HDFS:

    newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/")
    
  4. После сохранения кадра данных я беру те же столбцы из «hiveCols», подготавливаю DDL для создания таблицы кустов в том же месте со значениями, разделенными запятойприведены ниже:

создать таблицу, если не существует schema.tablename (col1 coldatatype, col2 coldatatype, col3 coldatatype, col4 coldatatype ... col200 datatype)
ROW FORMAT DELIMITEDПОЛЯ, ПРЕКРАЩЕННЫЕ'
хранится в виде текста

LOCATION' hdfs: //username/apps/hive/warehouse/database.db/lines_test_data56/ ';

После загрузки кадра данных вПри создании таблицы проблема, с которой я сталкиваюсь, заключается в том, что, когда я запрашиваю таблицу, я получаю неправильный вывод в запросе.Например: если я применяю приведенный ниже запрос к кадру данных перед сохранением его в виде файла:

finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2

, я получаю вывод правильно.Все значения правильно выровнены по столбцам:

[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]

Но после сохранения кадра данных в CSV-файле создайте таблицу поверх него (шаг 4) и примените тот же запрос к созданной таблице, которую я вижуданные перемешаны и неправильно сопоставлены со столбцами:

select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2

+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id     | line_num     | debit_rate  | debit_rate_text  | credit_rate  | credit_rate_text  | activity_amount  | activity_amount_text  | exchange_rate  | exchange_rate_text  | amount_cr  | amount_cr_text  |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598      | 2            | NULL        |                  | 381761.4    |                    | 5686.76          | 5686.76               | NULL           | -5686.76            | NULL       |                 |

Поэтому я попытался использовать другой подход, при котором я заранее создал таблицу кустов и вставил в нее данные из фрейма данных:

  • Запуск DDL на шаге 4 выше
  • finalDF.createOrReplaceTempView ("tmpTable")
  • spark.sql ("вставить в схему. Select select * from tmpTable")

И даже этот способ не работает, если я выполню вышеупомянутый запрос на выборку после завершения задания.Я попытался обновить таблицу, используя refresh table schema.table и msckrepair table schema.table, просто чтобы посмотреть, есть ли какие-либо проблемы с метаданными, но кажется, что ничего не получается.

Может кто-нибудь сообщить мне, что является причиной этого явления, есть лиЕсть какие-либо проблемы с тем, как я здесь работаю с данными?

Ответы [ 2 ]

0 голосов
/ 23 февраля 2019

Я использовал формат строки serde: org.apache.hadoop.hive.serde2.OpenCSVSerde в Hive DDL.Это также имеет символ ',' в качестве разделителя символов по умолчанию, и мне не нужно было вводить какой-либо другой разделитель.

0 голосов
/ 19 февраля 2019

Коды проверяются с использованием Spark 2.3.2


Вместо создания информационного кадра Spark из файла CSV и его регистрации в виде таблицы Hive, вы можете легко запускать команды SQL и создавать Hive.Таблицы из CSV-файла

val conf = new SparkConf
    conf
      .set("hive.server2.thrift.port", "10000")
      .set("spark.sql.hive.thriftServer.singleSession", "true")
      .set("spark.sql.warehouse.dir", "hdfs://PATH_FOR_HIVE_METADATA")
      .set("spark.sql.catalogImplementation","hive")
      .setMaster("local[*]")
      .setAppName("ThriftServer")

val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()

Теперь используя объект spark, вы можете запускать команду SQL от имени пользователя Hive:

spark.sql("DROP DATABASE IF EXISTS my_db CASCADE")
spark.sql("create database if not exists my_db")
spark.sql("use my_db")

Используя следующий код, вы можете загрузить все файлы csv_files в каталог HDFS.(или вы можете указать путь только к одному CSV-файлу):

spark.sql(
      "CREATE TABLE test_table(" +
        "id int," +
        "time_stamp bigint," +
        "user_name string) " +
        "ROW FORMAT DELIMITED " +
        "FIELDS TERMINATED BY ',' " +
        "STORED AS TEXTFILE " +
        "LOCATION 'hdfs://PATH_TO_CSV_Directory_OR_CSV_FILE' "
    )

И в конце зарегистрируйте объект Spark sqlContext как Hive ThriftServer:

HiveThriftServer2.startWithContext(spark.sqlContext)

Это создаст конечную точку ThriftServer на порту 10000.

INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads

Теперь вы можете запустить beeline и подключиться к ThriftServer:

beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: enter optional_username
Enter password for jdbc:hive2://localhost:10000: leave blank
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>

и проверить, создана ли таблица test_tableв my_db database:

0: jdbc:hive2://localhost:10000> use my_db;
0: jdbc:hive2://localhost:10000> show tables ;
+-----------+-----------------------+--------------+--+
| database  |       tableName       | isTemporary  |
+-----------+-----------------------+--------------+--+
| my_db     | test_table            | false        |
+-----------+-----------------------+--------------+--+

Кроме того, вы можете создать любую другую таблицу Hive (или любую команду HiveQL), используя конечную точку ThrifServer JDBC.

Вот необходимые зависимости:

 libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.8.3",
  "org.apache.hadoop" % "hadoop-common" % "2.8.3",
)
...