Модифицированный countByKey в спарк - PullRequest
2 голосов
/ 13 апреля 2019

У меня есть следующий фрейм данных:

+------+-------+
| key  | label |
+------+-------+
| key1 | a     |
| key1 | b     |
| key2 | a     |
| key2 | a     |
| key2 | a     |
+------+-------+

Я хочу модифицированную версию countByKeys в spark, которая возвращает вывод следующим образом:

+------+-------+
| key  | count |
+------+-------+
| key1 |     0 |
| key2 |     3 |
+------+-------+
//explanation: 
if all labels under a key are same, then return count of all rows under a key 
else count for that key is 0

Мой подход для решения этой проблемы:

Шаги:

  1. reduceByKey(): объединить все метки (рассматривать метки как строки), чтобы получить кадр данных типа < key,concat_of_all_labels >
  2. mapValues(): проанализировать каждыйстрока символьно, чтобы проверить, есть ли все то же самое.Если они имеют одинаковое возвращаемое количество меток, в противном случае возвращают 0.

Я новичок в зажигании, и я чувствую, что должен быть какой-то эффективный способ сделать это.Есть ли лучший способ выполнить эту задачу?

Ответы [ 3 ]

6 голосов
/ 13 апреля 2019

Это довольно просто: получить и счет, и различный счет по ключу, тогда это просто случай, когда ... тогда ...

val df = Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")).toDF("key", "label")
df.groupBy('key)
  .agg(countDistinct('label).as("cntDistinct"), count('label).as("cnt"))
  .select('key, when('cntDistinct === 1, 'cnt).otherwise(typedLit(0)).as("count"))
  .show

+----+-----+
| key|count|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+
2 голосов
/ 13 апреля 2019

Добавление к предыдущему решению.Использование ReduByKey более эффективно, если ваши данные действительно большие, и вы заботитесь о параллелизме.

Если ваши данные большие и хотите уменьшить эффект тасования, так как groupBy может вызвать тасование, вот другое решение, использующее RDD API и reduceByKey, которые будут работать на уровне раздела:

val mockedRdd = sc.parallelize(Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")))

// Converting to PairRDD
val pairRDD = new PairRDDFunctions[String, String](mockedRdd)

// Map and then Reduce
val reducedRDD = pairRDD.mapValues(v => (Set(v), 1)).
     reduceByKey((v1, v2) => (v1._1 ++ v2._1, v1._2 + v1._2))

scala> val result = reducedRDD.collect()
`res0: Array[(String, (scala.collection.immutable.Set[String], Int))] = Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))`

Теперь конечный результат имеет следующий формат (key, set(labels), count):

Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))

Теперь, после того как вы соберете результаты вваш драйвер, вы можете просто принять подсчеты из наборов, которые содержат только одну метку:

// Filter our sets with more than one label
scala> result.filter(elm => elm._2._1.size == 1)
res15: Array[(String, (scala.collection.immutable.Set[String], Int))] 
              = Array((key2,(Set(a),4)))

Анализ с использованием Spark 2.3.2

1) Анализ(DataFrame API) groupBy Solution

Я не совсем эксперт по Spark, но здесь я брошу свои 5 центов:)

Да, DataFrame и SQL Query пройдут Catalyst Optimizer , который, возможно, может оптимизировать подход groupBy.

groupBy, предложенный с использованием DataFrame API, генерирует следующий физический план, выполнив df.explain(true)

== Physical Plan ==
*(3) HashAggregate(keys=[key#14], functions=[count(val#15), count(distinct val#15)], output=[key#14, count#94L])
+- Exchange hashpartitioning(key#14, 200)
   +- *(2) HashAggregate(keys=[key#14], functions=[merge_count(val#15), partial_count(distinct val#15)], output=[key#14, count#105L, count#108L])
      +- *(2) HashAggregate(keys=[key#14, val#15], functions=[merge_count(val#15)], output=[key#14, val#15, count#105L])
         +- Exchange hashpartitioning(key#14, val#15, 200)
            +- *(1) HashAggregate(keys=[key#14, val#15], functions=[partial_count(val#15)], output=[key#14, val#15, count#105L])
               +- *(1) Project [_1#11 AS key#14, _2#12 AS val#15]
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#12]
                     +- Scan ExternalRDDScan[obj#10]

Обратите внимание , что работа была разбитаэто в три этапа, и имеет две фазы обмена.И стоит отметить, что во втором hashpartitioning exchange использовался другой набор ключей (ключ, метка), который в этом случае будет вызывать случайное перемешивание IMO, поскольку хэшированные разделы (key, val) не обязательно будут сосуществовать с хэшированными разделамитолько с (ключом).

Вот план, визуализируемый пользовательским интерфейсом Spark:

groupBy solution

2) Анализ СДРРешение API

Запустив reducedRDD.toDebugString, мы получим следующий план:

scala> reducedRDD.toDebugString
res81: String =
(8) ShuffledRDD[44] at reduceByKey at <console>:30 []
 +-(8) MapPartitionsRDD[43] at mapValues at <console>:29 []
    |  ParallelCollectionRDD[42] at parallelize at <console>:30 []

Вот план, визуализированный пользовательским интерфейсом Spark:

RDD API Approach

Вы можете ясно видеть, что подход RDD генерировал меньшее количество этапов и задач, а также не вызывает перестановок, пока мы не обработаем набор данных и не соберем его со стороны водителя.курс.Уже одно это говорит нам о том, что этот подход потребляет меньше ресурсов и времени.

Заключение в конце дня, какая оптимизация вы хотите применить, действительно зависит от требований вашего бизнеса, иРазмер данных, с которыми вы имеете дело.Если у вас нет больших данных, то подход с использованием groupBy будет простым вариантом;в противном случае важно учитывать (параллелизм, скорость, тасование и управление памятью), и большую часть времени вы можете достичь этого, анализируя планы запросов и проверяя свои задания с помощью Spark UI.

0 голосов
/ 13 апреля 2019
scala> val df = sc.parallelize(Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")))

scala> val grpby = df.groupByKey()

scala> val mp = gb.map( line  => (line._1,line._2.toList.length,line._2.toSet.size))
                  .map { case(a,b,c) => (a,if (c!=1) 0 else b) }

scala> val finres  =  mp.toDF("key","label")

scala> finres.show
+----+-----+
| key|label|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...