Перекрестное соединение в Apache Spark с набором данных выполняется очень медленно - PullRequest
0 голосов
/ 15 февраля 2019

Я разместил этот вопрос на форуме пользователей spark, но не получил ответа, поэтому спрашиваю его здесь снова.

У нас есть случай, когда нам нужно сделать декартово соединение, и по какой-то причине мы не можемзаставить его работать с API набора данных.

У нас есть два набора данных:

  • один набор данных с двумя строковыми столбцами, например, c1, c2.Это небольшой набор данных с ~ 1 млн. Записей.Оба столбца представляют собой строки из 32 символов, поэтому они должны быть не более 500 МБ.

    Мы передаем этот набор данных

  • другой набор данных немного больше с ~ 10 миллионами записей
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count

ЕслиЯ реализую его с помощью RDD API, где я транслирую данные в ds1, а затем фильтрую данные в ds2, он работает нормально.

Я подтвердил, что трансляция прошла успешно.

2019-02-1423:11:55 INFO CodeGenerator: 54 - код, сгенерированный за 10.469136 мс 2019-02-14 23:11:55 INFO TorrentBroadcast: 54 - Начало чтения переменной трансляции 29 2019-02-14 23:11:55 INFO TorrentBroadcast: 54 -Чтение широковещательной переменной 29 заняло 6 мс 2019-02-14 23:11:56 INFO CodeGenerator: 54 - код, сгенерированный за 11,280087 мс

План запроса:

==Физический план ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1 # 68 <= c11 # 13) && (c11 # 13 <= c2 # 69)) <br>: - * Project []
: + -* Фильтр isnotnull (_c0 # 0)
: + - * FileScan CSV [_c0 # 0, _c1 # 1, _c2 # 2, _c3 # 3, _c4 # 4, _c5 # 5] Пакетная обработка: false, Формат: CSV,Расположение: InMemoryFileIndex [], PartitionFilters: [], PressedFilters: [IsNotNull (_c0)], ReadSchema: struct <_c0: строка, _c1: строка, _c2: строка, _c3: строка, _c4: строка, _c5: строка>
+ - BroadcastExchange IdentityBroadcastMode
+ - * Проект [c1 # 68, c2 # 69]
+ - * Фильтр (isnotnull (c1 # 68) && isnotnull (c2 # 69))
+ - * FileScan csv [c1 # 68, c2 # 69] Пакетное: false, Формат: CSV, Расположение: InMemoryFileIndex [], PartitionFilters: [], PressedFilters: [IsNotNull (c1), IsNotNull (c2)], ReadSchema: struct

затем этап не выполняется.

Я обновил код для использования широковещательной рассылки ds1, а затем произвел объединение в mapPartitions для ds2.

val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)

, затем использовал этот rangeBC в методе mapPartitions, чтобыОпределите диапазон, к которому относится каждая строка в ds2, и это задание завершается через 3 часа, в то время как другое задание не завершается даже после 24 часов.Этот тип подразумевает, что оптимизатор запросов не делает то, что я хочу.

Что я делаю не так?Любые указатели будут полезны.Спасибо!

1 Ответ

0 голосов
/ 18 февраля 2019

Я не знаю, работаете ли вы на голом металле или на AWS со спотом или по требованию или выделенных, или на виртуальных машинах с AZURE и др.Мое мнение:

  • Примите во внимание, что 10M x 1M - это большая работа, даже если к результирующему перекрестному соединению применяется .filter.Это займет некоторое время.Каковы были ваши ожидания?
  • Spark - это в целом линейное масштабирование.
  • Центры обработки данных с виртуальными машинами не имеют выделенных и, следовательно, не обладают самой высокой производительностью.

Тогда:

  • Я работал на Databricks 10M x 100K в смоделированной конфигурации с ядром .86 и 6 ГБ на Driver for Community Edition.Это длилось 17 минут.
  • В вашем примере я запускал 10M x 1M на невыделенном кластере AWS EMR с 4 узлами (с некоторыми EMR-странностями, такими как резервирование драйвера в ценном экземпляре!), Потребовалось 3часы для частичного завершения.См. Рисунок ниже.

enter image description here

Итак, чтобы ответить на ваш вопрос: - Вы не ошиблись.

  • Просто нужно больше ресурсов, чтобы можно было больше распараллеливания.
  • Я добавил несколько явных разделов, как вы можете видеть.
...