Как сделать общий заказ в Apache Spark, не встречая OOM? - PullRequest
0 голосов
/ 21 марта 2020

Мне нужно иметь идентификатор ранга для моего фрейма данных, основанный на оценке, простое row_number () over (порядок за счетом), так как rank_id дает мне OOM, так как все данные собираются на одной машине. например,

select *, row_number() over (order by score) as rank_id from tbl order by score

monotonically_increasing_id () не производит вещи, которые мне нужны, так как мне нужны последовательные идентификаторы ранга. То же самое довольно просто сделать в MapReduce, но я не нашел способа сделать это в Spark, что любопытно ...

Ответы [ 2 ]

0 голосов
/ 30 марта 2020

На самом деле мой коллега дал мне решение, которое я нашел чистым и лаконичным, которое должно использовать zipwithindex , в pyspark это будет что-то вроде:

df
 .select('score', 'user_id')
 .rdd
 .sortBy(lambda a: a[0], ascending= False)
 .zipWithIndex()
 .map(lambda x: (x[0][0],x[0][1],x[1])).take(3)
0 голосов
/ 22 марта 2020

У меня есть некоторое время, чтобы рассмотреть эту проблему, хотя 1) я думаю, что традиционная база данных Oracle могла бы быть лучше здесь, и 2) я отмечаю, что Databricks в настоящий момент работает очень медленно.

В любом случае, Spark работает на раздел (параллелизм), а не на разделы для лучшей пропускной способности, вот в чем проблема. Я не уверен, что в MR это будет проще, но если это так, то используйте это, хотя сейчас это не так модно.

Я сделал свое дело, и это работает с рангом / плотным рангом, используя Range Partitioning , что означает, что для диапазонов значений одно и то же значение попадает и только в 1 раздел, поэтому вы можете применить ваше ранжирование, а затем общее ранжирование по некоторым умам, т.е. смещениям, полагаясь на разделение по диапазону, имеющее возрастающие значения в восходящих разделах. Не совсем уверен, что кэширование было хорошо, но для небольшого количества данных потребовалось некоторое время, но я думаю, что многие люди учатся в помещении из-за ситуации, когда вы знаете, что.

Кроме того, это хороший источник : https://www.waitingforcode.com/apache-spark-sql/range-partitioning-apache-spark-sql/read

Код

// Took some short cuts on names of fields, concentrated more on algorithm itself to avoid single partition aspect

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Window

case class X(id: Long, name: String, amount: Double)

Данные:

// Gen example data via DF
val df = Seq(
         (10, "order 1", 2000d), (11, "order 2", 240d), (12, "order 3", 232d), (13, "order 4", 100d), (214, "order 5", 11d), (15, "order 6", 1d),
         (2141, "order 7", 390d), (17, "order 8", 30d), (18, "order 9", 99d), (19, "order 10", 55d),  (20, "order 11", 129d), (21, "order 11", 75d), (15, "order 13", 1d)
        ).toDF("id", "name", "amount")

Обработка:

// Make a Dataset, makes it easier with Class in RDD conversion and back again to DF/DS
val ds = df.as[X]

// Num partitions for parallel processing so as to increase throughput, n can be anything, but you will get better throughput
// Range partitioning has all values of same value in same partition - that is the clue here
// dense and non-dense rank possibilities, not just order by row number as in your example
val n = 5
val rdd= ds.repartitionByRange(n, $"amount")
   .rdd 
   .mapPartitionsWithIndex((index, iter) => {
      iter.map(x => (index, x ))   
    })
val df2 = rdd.toDF().cache
df2.createOrReplaceTempView("tab1")
//Get the ranking per range partition
val res1 = spark.sql("""select *, rank() over (partition by _1 order by _2.amount asc) as RANK from tab1 """).cache
//res1.rdd.getNumPartitions // the number of partitions do not change, partitioning rangePartitioning maintained

res1.createOrReplaceTempView("tab2") 
// Get max val per partition, needs caching ideally, write to disk to avoid oddities in recomputation and caching bugs or not or what not. Not always convinced it works.
spark.sql("drop table if exists MAXVALS")
spark.sql(""" create table MAXVALS as select 'dummy' as dummy, _1, max(RANK) as max_rank from tab2 GROUP BY _1 UNION SELECT 'dummy', -1, 0  """)
val resA = spark.table("MAXVALS")
// Get offsets
val resB = resA.withColumn("cum_Max_RANK", sum("max_rank").over(
  Window
    .partitionBy("dummy")
    .orderBy(col("_1")) ))
resB.createOrReplaceTempView("tabC")

//So all the stuff works in parallel, but does it really help??? Is an RDBMS not better then???
val finalResult = spark.sql("""  select tab2._2, (tab2.RANK + tabC.cum_Max_RANK) as OVERALLRANK from tab2, tabc  where tabc._1 = (tab2._1 -1) ORDER BY OVERALLRANK ASC  """)
finalResult.show(false)

Результаты

+----------------------+-----------+
|_2                    |OVERALLRANK|
+----------------------+-----------+
|[15, order 6, 1.0]    |1          |
|[15, order 13, 1.0]   |1          |
|[214, order 5, 11.0]  |3          |
|[17, order 8, 30.0]   |4          |
|[19, order 10, 55.0]  |5          |
|[21, order 11, 75.0]  |6          |
|[18, order 9, 99.0]   |7          |
|[13, order 4, 100.0]  |8          |
|[20, order 11, 129.0] |9          |
|[12, order 3, 232.0]  |10         |
|[11, order 2, 240.0]  |11         |
|[2141, order 7, 390.0]|12         |
|[10, order 1, 2000.0] |13         |
+----------------------+-----------+

Выводы

Работает. Но есть ли смысл во всем этом? Да, так как выполнение работы всегда лучше, чем у ООМ.

  • Первоначальные вычисления могут выполняться параллельно для заказа, но для этого требуется repartitionByRange.
  • Но общий финал В операторе требуется сортировка, если вы хотите гарантировать порядок сортировки (для сбора, показа).
  • .explain не показывает ситуацию с одним разделом, но его необходимо протестировать в масштабе. Тем не менее, OOM можно избежать, и я подозреваю, что ненужная сортировка продолжается. Может быть, sortWithinPartition может быть ходячим, но я пока оставлю это без внимания.
  • Добавил немного логики c в исходный код ниже, чтобы посмотреть, можно ли убедить оптимизатора, чтобы избежать случайного перемешивания, но это выглядит так, как будто ненужная перестановка все еще происходит, когда оценивается вывод .explain (). Возможности для улучшения.

Модифицированный пункт, чтобы попытаться повлиять на Catalyst

 val finalResult = spark.sql("""  select tab2._1 as Z, tab2._2, (tab2.RANK + tabC.cum_Max_RANK) as OVERALLRANK from tab2, tabc  where tabc._1 = (tab2._1 -1) ORDER BY Z ASC, OVERALLRANK ASC  """)

Заключительный комментарий к спрашивающему

Человек ищет однострочный кодовый подход, который понятен. По функциональности понятно, что нужно, но ООМ доказывает, что это технически невозможно, иначе ООМ не было. Подход с одним разделом должен быть обойден, и этот метод требует большего параллелизма. Разделение подхода является основой Spark.

...