Я довольно новичок в Scala и буду признателен за любые советы экспертов.Мне нужно проверить несколько критериев соответствия в определенном порядке для очень большого фрейма данных искры (десятки или сотни миллионов записей).Мой текущий подход очень медленный и часто терпит неудачу в течение длительного времени работы.
Вот полное описание (с настройками и изменениями, которые должны быть разделены): у меня есть набор данных, который имеет несколько столбцов (назовем их col1 - 3).Другой метод изучает эти столбцы и решает, к какой группе / подгруппам относится данная запись, и они сохраняются в столбце правомочностей:
+------------------------------------------------------+
| col1 | col2 | col3 | eligibilities |
+ -----------------------------------------------------+
| x1 | y1 | z1 | A, a1, a2, B |
| x2 | y2 | z2 | A,a1 |
| x3 | y3 | z3 | A,a2, a3, B, b1 |
| x4 | y4 | z3 | A,a2 |
| x1 | y2 | z2 | A,a3,a2,B |
| x2 | y4 | z1 | A,a2,B |
| ... | ... | ... | ... |
+ -----------------------------------------------------+
, которые теперь имеют группы A и B, а также подгруппы a1, a2, a3 и b1.,Мне нужно проверить в порядке , если каждая из подгрупп достигла минимального предела количества, учитывая, что каждая строка может быть назначена только одной подгруппе, и количество исключает те, которые уже были назначены.Эта часть логики, поскольку она требует нескольких команд подсчета, очень медленная.Чтобы упростить процесс, я сначала группирую по «критериям» и подсчитываю строки, затем применяю вышеуказанную логику к сгруппированному набору данных.
+-------------------------------+
| eligibilities | count |
+-------------------------------+
| A, a1, a2, B | 120000 |
| A, a1. | 43000 |
| A, a2, a3, B, b1 | 321000 |
| A,a3,a2,B. | 5800 |
+-------------------------------+
Теперь я применяю логику принятия решения следующим образом:
- фильтрует набор данных сгруппированных по критериям для строк, которые включают a1.
- сумма столбца count
- Если больше порога.Присвойте им a1 и удалите строки, иначе игнорируйте
- goto 1 и выполните для a2, a3, b1, b2 и т. Д.
Вот общий поток моего кода:
val subgroups = list("a1","a2","a3","b1")
var dfG = dfx.groupBy("eligibilities").count()
subgroups.foreach { subgroup =>
val filtered_df = dfG.filter(col("eligibilities").contains(subgroup))
if (filtered_df(2).count() > 1) {
val rowCount = filtered_df(col("count")).rdd.map(_ (0).asInstanceOf[Long]).reduce(_ + _)
if (rowCount > threshold) {
dfG = dfG.filter(not(col("eligibilities").contains(subgroup)))
}
}
}
Есть ли лучший способ Spark / Scala для достижения этой цели?