Искровая ошибка - достигнуто максимальное количество итераций (100) для разрешения партии - PullRequest
0 голосов
/ 09 апреля 2020

Я работаю над Spark SQL, где мне нужно выяснить разницу между двумя большими CSV.

Разница должна дать: -

  • Вставленные строки или новые Запись // Сравнение только идентификаторов

  • Измененные строки (не включая вставленные) - Сравнение всех значений столбцов

  • Удаленные строки // Сравнение только идентификаторов

Spark 2.4.4 + Java

Я использую блоки данных для чтения / записи CSV

Dataset<Row> insertedDf = newDf_temp.join(oldDf_temp,oldDf_temp.col(key)
                .equalTo(newDf_temp.col(key)),"left_anti");
Long insertedCount = insertedDf.count();
logger.info("Inserted File Count == "+insertedCount);


Dataset<Row> deletedDf = oldDf_temp.join(newDf_temp,oldDf_temp.col(key)
                .equalTo(newDf_temp.col(key)),"left_anti")
                .select(oldDf_temp.col(key));
Long deletedCount = deletedDf.count();
logger.info("deleted File Count == "+deletedCount);


Dataset<Row> changedDf = newDf_temp.exceptAll(oldDf_temp); // This gives rows (New +changed Records)

Dataset<Row> changedDfTemp = changedDf.join(insertedDf, changedDf.col(key)
                .equalTo(insertedDf.col(key)),"left_anti"); // This gives only changed record

Long changedCount = changedDfTemp.count();
logger.info("Changed File Count == "+changedCount);

Это хорошо работает для CSV со столбцами до 50 или около того.

The Above code fails for one row in CSV with 300+columns, so I am sure this is not file Size problem.

Но если у меня CSV, имеющий более 300 столбцов, он завершается с исключением

Максимальное количество итераций (100) достигнуто для разрешения партии - Искровая ошибка

If I set the below property in Spark, It Works!!!

sparkConf.set ("spark. sql .optimizer.maxIterations", "500");

Но мой вопрос: почему я должен установить это?

Что-то не так, что я делаю? Или это поведение ожидается для CSV, которые имеют большие столбцы.

Могу ли я оптимизировать его любым способом для обработки больших столбцов CSV.

1 Ответ

1 голос
/ 10 апреля 2020

Проблема, с которой вы сталкиваетесь, связана с тем, как spark принимает инструкции, которые вы им рассказываете, и преобразует их в реальные вещи, которые он собирается выполнять. Сначала нужно понять ваши инструкции, запустив Analyzer, затем он пытается улучшить их, запустив свой оптимизатор. Эта настройка применима к обоим.

В частности, ваш код взрывается во время шага в Анализаторе. Анализатор отвечает за выяснение того, к чему вы на самом деле обращаетесь, когда вы обращаетесь к вещам. Например, сопоставление имен функций с реализациями или сопоставление имен столбцов по переименованиям и различные преобразования. Он делает это за несколько проходов, разрешая дополнительные вещи за каждый проход, затем снова проверяя, может ли он разрешить перемещение.

Я думаю, что происходит в вашем случае, каждый проход, вероятно, разрешает один столбец, но 100 проходов не выполняется ' Достаточно, чтобы решить все столбцы. Увеличивая его, вы даете ему достаточно пропусков, чтобы иметь возможность полностью выполнить ваш план. Это, безусловно, красный флаг для потенциальной проблемы с производительностью, но если ваш код работает, вы можете просто увеличить значение и не беспокоиться об этом.

Если это не сработает, вам, вероятно, придется попробовать что-то предпринять, чтобы уменьшить количество столбцов, используемых в вашем плане. Возможно объединение всех столбцов в один закодированный строковый столбец в качестве ключа. Вы могли бы извлечь пользу из контрольной точки данных перед выполнением объединения, чтобы вы могли сократить свой план.

РЕДАКТИРОВАТЬ:

Кроме того, я бы реорганизовал ваш код выше, чтобы вы могли сделать все это только с одним соединением. Это должно быть намного быстрее и может решить вашу другую проблему.

Каждое объединение приводит к случайному перемешиванию (передача данных между вычислительными узлами), которое добавляет время к вашей работе. Вместо того, чтобы вычислять, добавлять, удалять и изменять независимо, вы можете сделать их все сразу. Что-то вроде приведенного ниже кода. Это в scala коде psuedo, потому что я знаком с этим больше, чем Java API.

import org.apache.spark.sql.functions._

var oldDf = ..
var newDf = ..
val changeCols = newDf.columns.filter(_ != "id").map(col)

// Make the columns you want to compare into a single struct column for easier comparison
newDf = newDF.select($"id", struct(changeCols:_*) as "compare_new")
oldDf = oldDF.select($"id", struct(changeCols:_*) as "compare_old")

// Outer join on ID
val combined = oldDF.join(newDf, Seq("id"), "outer")

// Figure out status of each based upon presence of old/new
//  IF old side is missing, must be an ADD
//  IF new side is missing, must be a DELETE
//  IF both sides present but different, it's a CHANGE
//  ELSE it's NOCHANGE
val status = when($"compare_new".isNull, lit("add")).
             when($"compare_old".isNull, lit("delete")).
             when($"$compare_new" != $"compare_old", lit("change")).
             otherwise(lit("nochange"))

val labeled = combined.select($"id", status)

На данный момент у нас есть каждый идентификатор, помеченный как ADD / DELETE / CHANGE / NOCHANGE, поэтому мы можем просто groupBy / кол. Эту агг можно сделать почти полностью на стороне карты, поэтому она будет намного быстрее, чем объединение.

labeled.groupBy("status").count.show
...