Как оптимизировать агрегацию окон для больших windows? - PullRequest
2 голосов
/ 15 января 2020

Я использую оконные функции с огромным окном со свечой 2.4.4, например.

Window
  .partitionBy("id")
  .orderBy("timestamp")

В моих тестах у меня есть около 70 различных идентификаторов, но у меня может быть около 200 000 строк по идентификаторам. Без дальнейшей настройки я должен выделить много памяти моим исполнителям, чтобы избежать этого OOM:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

Глядя в исходном коде, я обнаружил этот параметр, который вообще не задокументирован:

spark.sql.windowExec.buffer.in.memory.threshold

Учитывая большой размер (например, 1.000.000), мне больше не нужно столько памяти. Насколько я понимаю, это количество строк, которые буферизуются; Я полагаю, что увеличение этого параметра не дублирует столько строк в памяти исполнителей, но мне это не совсем понятно.

Может кто-нибудь объяснить мне, как именно windows обрабатывается на стороне исполнителя? Почему данные дублируются? Как избежать этого дублирования и сделать процесс быстрее, с множеством строк в каждом окне? Какие параметры можно использовать?

Thx.

1 Ответ

1 голос
/ 15 января 2020

Я обнаружил этот параметр, который совсем не задокументирован:

Это свойство внутренней конфигурации.

Из чтения исходного кода мне удалось "собрать" следующее:

spark. sql .windowExe c .buffer.in.memory.threshold (внутренний) Порог для количества строк, которые гарантированно будут храниться в памяти по WindowExec физический оператор.

По умолчанию: 4096

Используйте метод SQLConf.windowExecBufferInMemoryThreshold для доступа к текущему значению.

Говоря о внутренних свойствах WindowExec оператор, есть еще один, который вам может понадобиться для настройки производительности:

spark. sql .windowExe c .buffer.spill.threshold (внутренний) Threshold для количества строк, буферизованных в физическом операторе WindowExec.

По умолчанию: 4096

Используйте метод SQLConf.windowExecBufferSpillThreshold для доступа к текущему значению.

Увы, я не могу полностью объяснить внутренности.

...