Я разместил этот вопрос на форуме пользователей spark, но не получил ответа, поэтому спрашиваю его здесь снова.
У нас есть случай, когда нам нужно сделать декартово соединение, и по какой-то причине мы не можемзаставить его работать с API набора данных.
У нас есть два набора данных:
- один набор данных с двумя строковыми столбцами, например, c1, c2.Это небольшой набор данных с ~ 1 млн. Записей.Оба столбца представляют собой строки из 32 символов, поэтому они должны быть не более 500 МБ.
Мы передаем этот набор данных
- другой набор данных немного больше с ~ 10 миллионами записей
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
ЕслиЯ реализую его с помощью RDD API, где я транслирую данные в ds1, а затем фильтрую данные в ds2, он работает нормально.
Я подтвердил, что трансляция прошла успешно.
2019-02-1423:11:55 INFO CodeGenerator: 54 - код, сгенерированный за 10.469136 мс 2019-02-14 23:11:55 INFO TorrentBroadcast: 54 - Начало чтения переменной трансляции 29 2019-02-14 23:11:55 INFO TorrentBroadcast: 54 -Чтение широковещательной переменной 29 заняло 6 мс 2019-02-14 23:11:56 INFO CodeGenerator: 54 - код, сгенерированный за 11,280087 мс
План запроса:
==Физический план ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1 # 68 <= c11 # 13) && (c11 # 13 <= c2 # 69)) <br>: - * Project []
: + -* Фильтр isnotnull (_c0 # 0)
: + - * FileScan CSV [_c0 # 0, _c1 # 1, _c2 # 2, _c3 # 3, _c4 # 4, _c5 # 5] Пакетная обработка: false, Формат: CSV,Расположение: InMemoryFileIndex [], PartitionFilters: [], PressedFilters: [IsNotNull (_c0)], ReadSchema: struct <_c0: строка, _c1: строка, _c2: строка, _c3: строка, _c4: строка, _c5: строка>
+ - BroadcastExchange IdentityBroadcastMode
+ - * Проект [c1 # 68, c2 # 69]
+ - * Фильтр (isnotnull (c1 # 68) && isnotnull (c2 # 69))
+ - * FileScan csv [c1 # 68, c2 # 69] Пакетное: false, Формат: CSV, Расположение: InMemoryFileIndex [], PartitionFilters: [], PressedFilters: [IsNotNull (c1), IsNotNull (c2)], ReadSchema: struct
затем этап не выполняется.
Я обновил код для использования широковещательной рассылки ds1, а затем произвел объединение в mapPartitions для ds2.
val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)
, затем использовал этот rangeBC в методе mapPartitions, чтобыОпределите диапазон, к которому относится каждая строка в ds2, и это задание завершается через 3 часа, в то время как другое задание не завершается даже после 24 часов.Этот тип подразумевает, что оптимизатор запросов не делает то, что я хочу.
Что я делаю не так?Любые указатели будут полезны.Спасибо!