Добавление к предыдущему решению.Использование ReduByKey более эффективно, если ваши данные действительно большие, и вы заботитесь о параллелизме.
Если ваши данные большие и хотите уменьшить эффект тасования, так как groupBy
может вызвать тасование, вот другое решение, использующее RDD API
и reduceByKey
, которые будут работать на уровне раздела:
val mockedRdd = sc.parallelize(Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")))
// Converting to PairRDD
val pairRDD = new PairRDDFunctions[String, String](mockedRdd)
// Map and then Reduce
val reducedRDD = pairRDD.mapValues(v => (Set(v), 1)).
reduceByKey((v1, v2) => (v1._1 ++ v2._1, v1._2 + v1._2))
scala> val result = reducedRDD.collect()
`res0: Array[(String, (scala.collection.immutable.Set[String], Int))] = Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))`
Теперь конечный результат имеет следующий формат (key, set(labels), count)
:
Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))
Теперь, после того как вы соберете результаты вваш драйвер, вы можете просто принять подсчеты из наборов, которые содержат только одну метку:
// Filter our sets with more than one label
scala> result.filter(elm => elm._2._1.size == 1)
res15: Array[(String, (scala.collection.immutable.Set[String], Int))]
= Array((key2,(Set(a),4)))
Анализ с использованием Spark 2.3.2
1) Анализ(DataFrame API) groupBy Solution
Я не совсем эксперт по Spark, но здесь я брошу свои 5 центов:)
Да, DataFrame
и SQL Query
пройдут Catalyst Optimizer , который, возможно, может оптимизировать подход groupBy
.
groupBy
, предложенный с использованием DataFrame API, генерирует следующий физический план, выполнив df.explain(true)
== Physical Plan ==
*(3) HashAggregate(keys=[key#14], functions=[count(val#15), count(distinct val#15)], output=[key#14, count#94L])
+- Exchange hashpartitioning(key#14, 200)
+- *(2) HashAggregate(keys=[key#14], functions=[merge_count(val#15), partial_count(distinct val#15)], output=[key#14, count#105L, count#108L])
+- *(2) HashAggregate(keys=[key#14, val#15], functions=[merge_count(val#15)], output=[key#14, val#15, count#105L])
+- Exchange hashpartitioning(key#14, val#15, 200)
+- *(1) HashAggregate(keys=[key#14, val#15], functions=[partial_count(val#15)], output=[key#14, val#15, count#105L])
+- *(1) Project [_1#11 AS key#14, _2#12 AS val#15]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#12]
+- Scan ExternalRDDScan[obj#10]
Обратите внимание , что работа была разбитаэто в три этапа, и имеет две фазы обмена.И стоит отметить, что во втором hashpartitioning exchange
использовался другой набор ключей (ключ, метка), который в этом случае будет вызывать случайное перемешивание IMO, поскольку хэшированные разделы (key, val) не обязательно будут сосуществовать с хэшированными разделамитолько с (ключом).
Вот план, визуализируемый пользовательским интерфейсом Spark:
2) Анализ СДРРешение API
Запустив reducedRDD.toDebugString
, мы получим следующий план:
scala> reducedRDD.toDebugString
res81: String =
(8) ShuffledRDD[44] at reduceByKey at <console>:30 []
+-(8) MapPartitionsRDD[43] at mapValues at <console>:29 []
| ParallelCollectionRDD[42] at parallelize at <console>:30 []
Вот план, визуализированный пользовательским интерфейсом Spark:
Вы можете ясно видеть, что подход RDD генерировал меньшее количество этапов и задач, а также не вызывает перестановок, пока мы не обработаем набор данных и не соберем его со стороны водителя.курс.Уже одно это говорит нам о том, что этот подход потребляет меньше ресурсов и времени.
Заключение в конце дня, какая оптимизация вы хотите применить, действительно зависит от требований вашего бизнеса, иРазмер данных, с которыми вы имеете дело.Если у вас нет больших данных, то подход с использованием groupBy будет простым вариантом;в противном случае важно учитывать (параллелизм, скорость, тасование и управление памятью), и большую часть времени вы можете достичь этого, анализируя планы запросов и проверяя свои задания с помощью Spark UI.