Как оптимизировать разбиение при переносе данных из источника JDBC? - PullRequest
0 голосов
/ 02 октября 2018

Я пытаюсь переместить данные из таблицы в таблице PostgreSQL в таблицу Hive в HDFS.Для этого я придумал следующий код:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

Данные вставляются в таблицу кустов, динамически разбитых на основе prtn_String_columns: source_system_name, period_year, period_num

Используется Spark-submit:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

Следующие сообщения об ошибках генерируются в журналах исполнителя:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

Я вижу в журналах, что чтение выполняется правильно с заданным количеством разделов, как показано ниже:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Ниже приведено поэтапное состояние исполнителей: enter image description here

enter image description here

enter image description here

enter image description here

Данные разделены неправильно.Один раздел меньше, а другой становится огромным.Здесь есть проблема перекоса.При вставке данных в таблицу Hive задание завершается с ошибкой в ​​строке: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF"), но я понимаю, что это происходит из-за проблемы перекоса данных.

Я пытался увеличить количество исполнителей, увеличивая память исполнителя, драйверпамяти, пытался просто сохранить как файл csv вместо сохранения кадра данных в таблицу Hive, но ничто не влияет на выполнение из исключения:

java.lang.OutOfMemoryError: GC overhead limit exceeded

Есть ли в коде что-то, что мне нужно исправить?Может кто-нибудь сообщить мне, как я могу решить эту проблему?

Ответы [ 3 ]

0 голосов
/ 07 октября 2018

Был еще один ваш вопрос, перенаправленный сюда как дубликат

 'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

, если проблема заключается в работе с данными, которые разделены в кадре данных после чтения, вы пытались увеличить значение «numPartitions»??

.option("numPartitions",50)

lowerBound, upperBound шаг разделов формы для сгенерированных выражений и числовых выражений предложения WHERE определяет число разбиений.

скажем, например, у sometable есть идентификатор столбца (мы выбираем это как partitionColumn);Диапазон значений, который мы видим в таблице для столбца- ID, составляет от 1 до 1000, и мы хотим получить все записи, запустив select * from sometable, поэтому мы собираемся с lowerbound = 1 & upperbound = 1000 и numpartition = 4

это создаст кадр данных из 4 разделов с результатом каждого запроса путем построения sql на основе нашего фида (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

что, если большинство записей в нашей таблице попадет в диапазон ID(500,750).Вы попадаете в такую ​​ситуацию.

Когда мы увеличиваем числовое разделение, разделение происходит еще дальше, и это уменьшает объем записей в том же разделе, но это не лучший вариант.

Вместо этогорасщепления искры partitioncolumn на основе предоставленных нами границ, если вы думаете о том, чтобы самостоятельно разделить расщепление, данные могут быть равномерно разделены.вам нужно переключиться на другой метод JDBC, где вместо (lowerbound,upperbound & numpartition) мы можем предоставить предикаты напрямую.

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 

Link

0 голосов
/ 08 октября 2018

По моему опыту, есть 4 вида настроек памяти, которые имеют значение:

A) [1] Память для хранения данных по причинам обработки VS [2] Пространство кучи для хранения стека программ

B) [1] Driver VS [2] память исполнителя

До сих пор мне всегда удавалось успешно запускать задания Spark, увеличивая соответствующий тип памяти:

Таким образом, A2-B1 будет иметь память, доступную в драйвере для хранения стека программ.И т. Д.

Имена свойств следующие:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

Имейте в виду, что сумма всех * -B1 должна быть меньшеобъем доступной памяти на ваших рабочих и сумма всех * -B2 должны быть меньше, чем объем памяти на вашем узле драйвера.

Могу поспорить, что виновником является одна из смело отмеченных настроек кучи.

0 голосов
/ 06 октября 2018
  1. Определите, сколько разделов вам нужно, учитывая количество входных данных и ресурсы вашего кластера.Как правило, лучше вводить разделы размером менее 1 ГБ, если в этом нет особой необходимости.и строго меньше, чем ограничение размера блока.

    Вы ранее указали , что переносите 1 ТБ значений данных, которые вы используете в разных сообщениях (5 - 70), - это, вероятно, путь к низкому уровню, чтобы обеспечитьплавный процесс.

    Попробуйте использовать значение, которое больше не потребует repartitioning.

  2. Знайте свои данные.

    Анализ столбцов, доступных в наборе данных, чтобы определить, есть ли какие-либо столбцы с высокой мощностью и равномерным распределением, которые будут распределены по требуемому количеству разделов.Это хорошие кандидаты на процесс импорта.Кроме того, вы должны определить точный диапазон значений.

    Агрегации с различной степенью центральности и асимметрии, а также гистограммы и основные подсчеты по ключам являются хорошими инструментами исследования.Для этой части лучше анализировать данные непосредственно в базе данных, а не извлекать их в Spark.

    В зависимости от СУБД вы можете использовать width_bucket (PostgreSQL, Oracle) или эквивалентную функцию для полученияхорошая идея, как данные будут распределяться в Spark после загрузки с partitionColumn, lowerBound, upperBound, numPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
    
  3. Если нет столбцов, которыеудовлетворяйте вышеуказанным критериям:

    • Создание пользовательского и показ его через.вид.Хэши нескольких независимых столбцов обычно являются хорошими кандидатами.Пожалуйста, обратитесь к руководству по вашей базе данных, чтобы определить функции, которые можно использовать здесь (DBMS_CRYPTO в Oracle, pgcrypto в PostgreSQL) *.
    • Использование набора независимых столбцов, которые вместе взятые, обеспечивают достаточно высокую мощность.

      По желанию, если вы собираетесь записывать в многораздельную таблицу Hive, следует рассмотреть возможность включения столбцов разбиения Hive.Это может ограничить количество файлов, сгенерированных позже.

  4. Подготовка аргументов разделения

    • Если столбец выбран или создан впредыдущие шаги являются числовыми ( или метка даты / времени в Spark> = 2.4 ), предоставьте его непосредственно как partitionColumn и используйте значения диапазона, определенные ранее, чтобы заполнить lowerBound и upperBound.

      Если привязанные значения не отражают свойства данных (min(col) для lowerBound, max(col) для upperBound), это может привести к значительному искажению данных, поэтому следите внимательно.В худшем случае, когда границы не охватывают диапазон данных, все записи будут выбираться на одной машине, что делает его не лучше, чем полное отсутствие разделения.

    • ЕслиСтолбец, выбранный на предыдущих этапах, является категориальным или представляет собой набор столбцов, генерирующих список взаимоисключающих предикатов , которые полностью охватывают данные, в форме, которая может использоваться в предложении SQL where.

      Например, если у вас есть столбец A со значениями {a1, a2, a3} и столбец B со значениями {b1, b2, b3}:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"
      

      Дважды проверьте, что условия не перекрываются и все комбинации покрыты.Если эти условия не выполняются, вы получаете дубликаты или недостающие записи соответственно.

      Передача данных в качестве аргумента predicates в вызов jdbc.Обратите внимание, что количество разделов будет точно равно количеству предикатов.

  5. Переведите базу данных в режим только для чтения (любые продолжающиеся операции записи могут привести к несогласованности данныхЕсли возможно, вам следует заблокировать базу данных перед началом всего процесса, но, если это невозможно, в вашей организации).

  6. Если количество разделов соответствует требуемым выходным данным загрузки безrepartition и дамп непосредственно в приемник, если нет, вы можете попытаться перераспределить, следуя тем же правилам, что и в шаге 1.

  7. Если у вас все еще есть какие-либо проблемы, убедитесь, что вы 'Правильно ли настроены параметры памяти Spark и GC.

  8. Если ничего из вышеперечисленного не работает:

    • Подумайте о том, чтобы сбросить данные в сеть / распределить хранилище с помощью таких инструментов, как COPY TO, и прочитать их прямо оттуда.

      Обратите внимание, что для стандартных утилит базы данных вам обычно требуется POSIX-совместимая файловая систематак что HDFS обычно не подойдет.

      Преимущество этого подхода заключается в том, что вам не нужно беспокоиться о свойствах столбца, и нет необходимости переводить данные в режим только для чтения, чтобы обеспечить согласованность.

    • Использование специальных инструментов массовой передачи, таких как Apache Sqoop, и последующее изменение формы данных.


* Не использовать псевдоколонки - псевдостолбцы в Spark JDBC .

...