Spark sql Методы оптимизации загрузки CSV в или c формат улья - PullRequest
0 голосов
/ 02 апреля 2020

Привет, у меня есть данные 90 ГБ. В файле CSV я загружаю эти данные в одну временную таблицу, а затем из временной таблицы в таблицу или c, используя команду select insert, но для преобразования и загрузки данных в формат или c. 4 часа искры sql. Есть ли какая-либо техника оптимизации, которую я могу использовать, чтобы сократить это время. На данный момент я не использую никакой техники оптимизации, я просто использую spark sql и загружаю данные из CSV-файла в таблицу (textformat), а затем из этой временной таблицы в таблицу или c (с использованием select insert) с использованием spark submit как:

    spark-submit \
    --class class-name\
    --jar file

или я могу добавить любой дополнительный параметр в spark submit для улучшение оптимизации.

scala код (образец):

    All Imports
    object demo {
    def main(args: Array[String]) {
    //sparksession with enabled hivesuppport

    var a1=sparksession.sql("load data inpath 'filepath'  overwrite into table table_name")

    var b1=sparksession.sql("insert into tablename (all_column) select 'ALL_COLUMNS' from    source_table")

    }
    }

1 Ответ

1 голос
/ 02 апреля 2020

Я просто использую spark sql и загружаю данные из CSV-файла в таблицу (textformat), а затем из этой временной таблицы в таблицу или c (используя select insert)


Двухэтапный процесс здесь не нужен ..

  • Прочитайте кадр данных, как показано ниже: ...
val DFCsv = spark.read.format("csv")
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("yourcsv")

  • если вам необходимо нужно сделать repartition (может быть, это является причиной фактической 4-часовой задержки, поскольку вы этого не сделали), начиная с его большого файла, а затем ...

dfcsv.repartition(90) означает, что будет / может быть передел данные CSV в 90 почти равных частей. где 90 - номер образца. Вы можете упомянуть, что вы когда-либо хотите.

      DFCsv.write.format("orc")
    .partitionBy('yourpartitioncolumns')
    .saveAsTable('yourtable')

ИЛИ

     DFCsv.write.format("orc")
     .partitionBy('yourpartitioncolumns')
     .insertInto('yourtable')

Примечание: 1) Для больших данных вам нужно сделать перераспределение для равномерного распределения данных увеличит закономерность и, следовательно, производительность.

2) Если у вас нет столбцов patition и нет таблицы разделов, то в приведенных выше примерах нет необходимости partitionBy

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...