Spark передает большой набор данных, не предназначенный для вещания - PullRequest
1 голос
/ 16 мая 2019

Я использую Spark 2.3.0, у меня есть 2 набора данных, оба они достаточно большие, 400 МБ +.Когда я присоединился к этим Spark, попытался передать одну из нихТот, у которого меньше столбцов (если он все равно помогает идентифицировать RCA).Сбой по причине Причины: java.util.concurrent.TimeoutException: время ожидания фьючерса истекло после ошибки [300 секунд], так как у меня есть настройки по умолчанию для соответствующих конфигов.

У меня есть настройки по умолчанию для spark.sql.broadcastTimeout и spark.sql.autoBroadcastJoinThreshold (10 МБ). Я не хочу максимально отключать вещание.

Во время сортировки я обнаружилчто если я установлю broadcastTimeout> 60, это будет работать, но не будет работать, так как размер набора данных увеличивается.Не знаете, почему spark не соблюдает autoBroadcastJoinThreshold?

Я не использую хранилище метаданных кустов, мои файлы хранятся в HDFS, я использую схему для них.

Пробовал добавлять фиктивные столбцы, как я полагаюв чудесах:)

    Dataset<Row> MergedById = fromValidFromField.as("df1")
            .join(filteredByMailIds.as("df2"),
                    col("df1.id")
                            .equalTo(col("df2.id")),"inner")

Это соединение, которое приводит к трансляции

1 Ответ

1 голос
/ 16 мая 2019

Spark принимает решение о широковещании, оценивая размер данных после операций (например, фильтра и т. Д.) С набором данных, не используя фактический размер набора данных.

Например: пусть b (id: Int, name: String) будет таблицей размером 1 ГБ (> широковещательный порог).

select * from a join b ON a.id = b.id AND b.id < 100

В приведенном выше примере используемой стратегией соединения будет Broadcast , поскольку фактически задействованные данные (предположим, 100 уникальных строк) в операции соединения очень меньше и будут меньше порогового значения по умолчанию в 10 МБ. .

Мы можем найти размер передаваемых данных, проанализировав план.

Let plan: LogicalPlan = df.queryExecution.optimizedPlan
val size = df.find(_.isInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Join])
.get
.stats
.sizeInBytes

Это должно дать подсказки о том, соблюдается ли широковещательный порог (приведенные выше коды предполагают, что в запросе есть только одна операция соединения)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...