Как я могу оптимизировать агрегацию декартовых пар между двумя столбцами списка по всему фрейму данных в PySpark в масштабе? - PullRequest
0 голосов
/ 02 мая 2020

Hello World, (мне показалось уместным, так как это мой первый вопрос по stackoverflow)

Я надеюсь получить несколько советов о том, как оптимизировать агрегацию того, что я опишу как «декартовы» пары между два столбца в кадре данных, которые оба содержат списки длин.

Я предоставил код для небольшого примера, который работает точно так, как ожидалось, чтобы помочь сформулировать то, что я пытаюсь достичь, в надежде, что он лучше сформулирует проблема. Ключевые шаги:

  1. Рассчитать «декартовы» пары между col (A) и col (B) (используя UDF udf_product ())
  2. Собрать все «декартовые» пары по всем строкам в фрейме данных в одну строку (с использованием агрегации collect_set ())
  3. Свести сопоставленный результат из 3-уровневого массива в 2-уровневый массив для простоты обработки в нисходящем направлении (с помощью UDF udf_flatten3Dto2DList ())

(то есть преобразовать сопоставленные результаты из

[[[pair1 из row1], ..., [pairN из row 2]], [[pair1 из row2], ... , [pairN из row2]], ..., [[pair1 из rowM], ..., [pairN из rowM]]]

до

[[pair1 из row1], ..., [параN из строки 2], [пара1 из строки2], ..., [параN из строки2], ..., [пара1 из строкиM], ..., [параN из строкиM]

по существу удаление разделения списка на основе строк)

Collect () объединенный список всех пар вне структуры dataframe (заметьте, я только собираю () - одну строку из-за использования агрегированного преобразования select () до collect (), который должен мы надеемся, что это совсем не дорого в вычислительном отношении ... Пожалуйста, исправьте меня, если я ошибаюсь ...) Преобразование пар из формата списка в формат кортежа, чтобы позволить su использовать функцию Counter () Используйте функцию Counter (), чтобы, наконец, получить общее количество для каждой декартовой пары col (A) x col (B), отображаемой на всем фрейме данных

Ниже приведен мой маленький масштаб Например, установка параметров управления таким образом, чтобы время вычислений было небольшим, чтобы показать, что logi c работает.

### Import relevant modules / functions
import random
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *
from pyspark.sql.functions import * 
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from collections import Counter ## used for dictionary work
from itertools import product ## product('AB', 'xyz') == 'Ax' 'Ay' 'Az' 'Bx' 'By' 'Bz'

### Define UDFs to calculate all col(A) and col(B) 'dot product' pairings
udf_flatten3Dto2DList = udf(lambda x: [item for list in x for item in list], ArrayType(ArrayType(LongType())))
def product_lists(x, y):
  return [[x for x in pair] for pair in product(x, y)]
udf_product = udf(product_lists, ArrayType(ArrayType(LongType())))

############ EXAMPLE THAT WORKS AS EXPECTED ################
### Define parameters to control scale of problem
pair_index_options = 3
num_rows = 10

### Create dummy dataframe
a = spark.createDataFrame([(1, [random.randrange(1,pair_index_options + 1, random.randrange(1,pair_index_options + 1)], [random.randrange(1,pair_index_options + 1), random.randrange(1,pair_index_options + 1)]) for i in range(num_rows)], ['ID', 'A', 'B'])
a.show()

>>> OUTPUT:
+---+------+------+
| ID|     A|     B|
+---+------+------+
|  1|[2, 2]|[3, 3]|
|  1|[2, 3]|[1, 2]|
|  1|[1, 2]|[2, 1]|
|  1|[1, 3]|[2, 1]|
|  1|[2, 1]|[1, 1]|
|  1|[3, 1]|[2, 1]|
|  1|[3, 1]|[2, 3]|
|  1|[1, 1]|[1, 1]|
|  1|[1, 1]|[3, 1]|
|  1|[2, 2]|[2, 1]|
+---+------+------+


pair_lists = a.withColumn('product', udf_product('A','B'))
              .select(f.collect_list('product').alias('product'))
              .withColumn('product_pairs', udf_flatten3Dto2DList('product'))
              .collect()[0]['product_pairs']
print(pair_lists)

>>> OUTPUT:
[[1, 2], [1, 1], [2, 2], [2, 1], [1, 1], [1, 1], [1, 1], [1, 1], [1, 2], [1, 1], [3, 2], [3, 1], [1, 3], [1, 1], [1, 3], [1, 1], [2, 1], [2, 2], [3, 1], [3, 2], [3, 2], [3, 3], [1, 2], [1, 3], [2, 1], [2, 1], [1, 1], [1, 1], [2, 2], [2, 1], [2, 2], [2, 1], [2, 3], [2, 3], [2, 3], [2, 3], [3, 2], [3, 1], [1, 2], [1, 1]]

### Convert lists to tuples to enable creation of a counter Dictionary (lists are not 'keyable' as they are not immutable)
pair_tuples = [tuple(pair) for pair in pair_lists]
dict_Counter = Counter(pair_tuples)
print(dict_Counter)

>>> OUTPUT:
Counter({(1, 1): 11, (2, 1): 6, (1, 2): 4, (2, 2): 4, (3, 2): 4, (2, 3): 4, (3, 1): 3, (1, 3): 3, (3, 3): 1})


Ниже приведен мой пример «реального мира», настройка управления параметры для масштабирования проблемы до одного, который более соответствует масштабу моей реальной проблемы (вызывая то, что, как я понял, является своего рода ошибкой нехватки памяти). * 1 039 *

############ EXAMPLE THAT CRASHES ################
pair_index_options = 200 ## Scale such that there are approximately 40,000 pair combinations
num_pairs = int(1e8) ### Scale such that there are approximately 100,000,000 rows to process

a = spark.createDataFrame([(1, [random.randrange(1,pair_index_options + 1), random.randrange(1,pair_index_options + 1)], [random.randrange(1,pair_index_options + 1), random.randrange(1,pair_index_options + 1)]) for i in range(num_pairs)], ['ID', 'A', 'B'])
a.show()
pair_lists = a.withColumn('product', udf_product('A', 'B')).select(f.collect_list('product').alias('product')).withColumn('product_pairs', udf_flatten3Dto2DList('product')).collect()[0]['product_pairs']
print(pair_lists)
print("")
pair_tuples = [tuple(pair) for pair in pair_lists]
dict_Counter = Counter(pair_tuples)
print(dict_Counter)

Ниже приведена ошибка, возникающая в Databricks при запуске примера «реального мира».

Мои ограниченные знания по интерпретации журналов ошибок заставляют меня думать, что Суть проблемы связана с этим выводом «Невозможно увеличить BufferHolder по размеру 176, потому что размер после увеличения превышает ограничение размера 2147483632», который, основываясь на поиске в Google, связан с проблемами с памятью, но я не уверен, как решить this.

org. apache .spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 77.0 не выполнено 4 раза, последний сбой: потерянное задание 0.3 на этапе 77.0 (TID 1986 , 10.139.64.11, исполнитель 0): java .lang.IllegalArgumentException: Невозможно увеличить BufferHolder до размера 176, поскольку размер после увеличения превышает ограничение размера 2147483632

------------ -------------------------------------------------- ------------- Py4JJavaError Traceback (последний вызов был последним) в 19 a = spark.createDataFrame ([(1, [1, 2], [3, 4]) для i в диапазоне (num_pairs)], ['ID', 'A', 'B']) 20 a.show () ---> 21 pair_lists = a.withColumn ('product', udf_product ('A', 'B')). select (f.collect_list ('product'). alias ('product')). withColumn ('product_pairs', udf_flatten3Dto2DList ('product')). collect () [0 ] ['product_pairs'] 22 print (pair_lists) 23 print ("")

/ databricks / spark / python / pyspark / sql / dataframe.py в collect (self) 550 # Путь по умолчанию, используемый в OSS Spark / для кластеров без DF-ACL: 551 с SCCallSiteSyn c (самостоятельно. _s c) as css: -> 552 sock_info = self._jdf.collectTo Python () 553 список возврата (_load_from_socket (sock_info, BatchedSerializer (PickleSerializer ()))) 554

/ databricks / spark / python / lib / py4j-0.10.7-sr c .zip / py4j / java_gateway.py в call (self, * args) 1255 answer = self.gateway_client.send_command (команда ) 1256 return_value = get_return_value (-> ответ 1257, self.gateway_client, self.target_id, self.name) 1258 1259 для temp_arg в temp_args:

/ databricks / spark / python / pyspark / sql / utils.py in deco (* a, ** kw) 61 def deco (* a, ** kw): 62 try: ---> 63 возвращает f (* a, ** kw) 64 за исключением py4j.protocol.Py4JJavaError как e: 65 s = e.java_exception.toString ()

/ databricks / spark / python / lib / py4j-0.10.7-sr c .zip / py4j / protocol.py в get_return_value (ответ, gateway_client, target_id, name) 326 повысить Py4JJavaError (327 "Произошла ошибка при вызове {0} {1} {2}. \ n". -> 328 формат (target_id, ".", name), значение) 329 else: 330 поднять Py4JError (

Py4J JavaError: Произошла ошибка при вызове o12878.collectTo Python. : org. apache .spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 0 на этапе 77.0 4 раза, последний сбой: потерянное задание 0.3 на этапе 77.0 (TID 1986, 10.139.64.11, исполнитель 0): java .lang.IllegalArgumentException: Невозможно увеличить BufferHolder по размеру 176, поскольку размер после увеличения превышает ограничение размера 2147483632 в org. apache .spark. sql .catalyst.expressions.codegen.BufferHolder.grow (BufferHolder. java : 71) в орг. apache .spark. sql .catalyst.expressions.codegen.UnsafeWriter.grow (UnsafeWriter. java: 62) в орг. apache .spark. sql .catalyst.expressions .codegen.UnsafeWriter.write (UnsafeWriter. java: 160) в org. apache .spark. sql .catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection.apply (неизвестный источник) в org. apache .spark. sql .execution.aggregate.AggregationIterator $$ anonfun $ generateResultProjection $ 1.apply (AggregationIterator. scala: 235) в орг. apache .spark. sql .execution.aggregate.AggregationIterator $$ anonfun $ generateResPro .Не (Aggregatio nIterator. scala: 224) в орг. apache .spark. sql .execution.aggregate.ObjectAggregationIterator.next (ObjectAggregationIterator. scala: 86) в орг. apache .spark. sql. execute.aggregate.ObjectAggregationIterator.next (ObjectAggregationIterator. scala: 33) в scala .collection.Iterator $$ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $$ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $$ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $ GroupedIterator.takeDestructively (Iterator. scala: 1074) в scala .collection.Iterator $ GroupedIterator. go (Итератор. scala: 1089) в scala .collection.Iterator $ GroupedIterator.fill (Итератор. scala: 1126) в scala .collection.Iterator $ GroupedIterator.hasNext (Iterator. scala: 1130) в scala .collection.Iterator $$ anon $ 11.hasNext (Iterator. scala: 409) в scala .collection .Iterator $ class.foreach (Iterator. scala: 891) в scala .collection.AbstractIterator.foreach (Iterator. scala: 1334) в org. apache .spark.api. python .PythonRDD $. writeIteratorToStream (PythonRDD. scala: 368) в орг. apache .spark. sql .execution. python .PythonUDFRunner $$ anon $ 2.writeIteratorToStream (PythonUDFRunner. scala: 50) в орг. apache .spark.api. python .BasePythonRunner $ WriterThread $$ anonfun $ run $ 1.apply (PythonRunner. scala: 430) в org. apache .spark.util.Utils $ .logUncaughtExceptions (Utils. * 1148) *: 2136) в орг. apache .spark.api. python .BasePythonRunner $ WriterThread.run (PythonRunner. scala: 236)

Отслеживание стека драйверов: в орг. apache .spark .scheduler. DAGScheduler.org $ apache $ spark $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler. scala: 2362) в организации. apache .spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGSchedu: 2350) в орг. apache .spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler. scala: 2349) в scala .collection.mutable.ResizableArray $ class.foreach (ResizableArray. * 1159) *: 59) в scala .collection.mutable.ArrayBuffer.foreach (ArrayBuffer. scala: 48) в орг. apache .spark.scheduler.DAGScheduler.abortStage (DAGScheduler. scala: 2349) в орг. . apache .spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler. scala: 1102) в организации. apache .spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed DAG (DAG. scala: 1102) в scala .Option.foreach (опция. scala: 257) в орг. apache .spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler. scala: 1102) в орг. apache .spark.scheduler.DAGSchedulerEventProcessL oop .doOnReceive (DAGScheduler. scala: 2582) в орг. apache .spa rk.scheduler.DAGSchedulerEventProcessL oop .onReceive (DAGScheduler. scala: 2529) в org. apache .spark.scheduler.DAGSchedulerEventProcessL oop .onReceive (DAGScheduler. scala: 2517) или * .spark.util.EventLoop $$ anon $ 1.run (EventL oop. scala: 49) в org. apache .spark.scheduler.DAGScheduler.runJob (DAGScheduler. scala: 897) в org . apache .spark.SparkContext.runJob (SparkContext. scala: 2280) в орг. apache .spark.SparkContext.runJob (SparkContext. scala: 2378) в орг. apache .spark. sql .execution.collect.Collector.runSparkJobs (Collector. scala: 245) в орг. apache .spark. sql .execution.collect.Collector.collect (Collector. scala: 280) в орг. . apache .spark. sql .execution.collect.Collector $ .collect (Collector. scala: 80) в орг. apache .spark. sql .execution.collect.Collector $ .collect ( Collector. scala: 86) в орг. apache .spark. sql .execution.ResultCacheManager.getOrComputeResult (ResultCacheManager. scala: 508) в орг. apache .spark. sql .execution. ResultCacheManager.getOrComputeResult (ResultCacheManage r. scala: 480) в орг. apache .spark. sql .execution.SparkPlan.executeCollectResult (SparkPlan. scala: 325) в орг. apache .spark. sql .Dataset $ $ anonfun $ 50.apply (набор данных. scala: 3358) в орг. apache .spark. sql .Dataset $$ anonfun $ 50.apply (набор данных. scala: 3357) в орг. apache. spark. sql .Dataset $$ anonfun $ 54.apply (Набор данных. scala: 3492) в org. apache .spark. sql .Dataset $$ anonfun $ 54.apply (Набор данных. scala: 3487 ) в org. apache .spark. sql .execution.SQLExecution $$ anonfun $ withCustomExecutionEnv $ 1.apply (SQLExecution. scala: 113) в org. apache .spark. sql .execution.SQLExecution $ .withSQLConfPropagated (SQLExecution. scala: 242) в орг. apache .spark. sql .execution.SQLExecution $ .withCustomExecutionEnv (SQLExecution. scala: 99) в орг. apache .spark. 1233 * .execution.SQLExecution $ .withNewExecutionId (SQLExecution. scala: 172) в org. apache .spark. sql .Dataset.org $ apache $ spark $ sql $ Набор данных $$ withAction (Набор данных. * 1237) *: 3487) в орг. apache .spark. sql .Dataset.collectTo Python (Набор данных. scala: 3357) в sun.reflect.NativeMeth odAccessorImpl.invoke0 (родной метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl. java: 62) в sun.reflect.DelegatingMethodAccessorImpl.invoke * 12го делегирования. 12f. Method.invoke (Method. java: 498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker. java: 244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine. java: 380) в py4j.Gateway .invoke (Gateway. java: 295) на py4j.commands.AbstractCommand.invokeMethod (AbstractCommand. java: 132) на py4j.commands.CallCommand.execute (CallCommand. java: 79) на py4j.GatewayConnection. run (GatewayConnection. java: 251) в java .lang.Thread.run (Thread. java: 748) Причина: java .lang.IllegalArgumentException: Невозможно увеличить BufferHolder по размеру 176, потому что размер после выращивания превышает ограничение размера 2147483632 в орг. apache .spark. sql .catalyst.expressions.codegen.BufferHolder.grow (BufferHolder. java: 71) в орг. apache .spark. sql .catalyst.expressions.codegen.UnsafeWriter.grow (UnsafeWriter. java: 62) в org. apache .spark. sql .catalyst.expressions.codegen.UnsafeWriter.write (UnsafeWriter. java) : 160) в орг. apache .spark. sql .catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection.apply (неизвестный источник) в орг. apache .spark. sql .execution.aggregate.AggregationIterator $$ anonfun $ generateResultProjection $ 1.apply (AggregationIterator. scala: 235) в организации. apache .spark. sql .execution.aggregate.AggregationIterator $$ anonfun $ generateResultProjection $ 1.apply (AggregationIterator. scala: 224) в org. apache .spark. sql .execution.aggregate.ObjectAggregationIterator.next (ObjectAggregationIterator. scala: 86) в org. apache .spark. sql .execution.aggregate.ObjectAggregationIterator.next (ObjectAgg . scala: 33) на scala .co llection.Iterator $$ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $$ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $ $ anon $ 11.next (Iterator. scala: 410) в scala .collection.Iterator $ GroupedIterator.takeDestructively (Iterator. scala: 1074) в scala .collection.Iterator $ GroupedIterator. go ( Iterator. scala: 1089) в scala .collection.Iterator $ GroupedIterator.fill (Iterator. scala: 1126) в scala .collection.Iterator $ GroupedIterator.hasNext (Iterator. scala: 1130) в scala .collection.Iterator $$ anon $ 11.hasNext (Iterator. scala: 409) в scala .collection.Iterator $ class.foreach (Iterator. scala: 891) в scala .collection .AbstractIterator.foreach (Iterator. scala: 1334) в org. apache .spark.api. python .PythonRDD $ .writeIteratorToStream (PythonRDD. scala: 368) в org. apache .spark. sql .execution. python .PythonUDFRunner $$ anon $ 2.writeIteratorToStream (PythonUDFRunner. scala: 50) в org. apache .spark.api. python .BasePythonRunner $ WriterThread $$ anonfun $ run $ 1 . .Не (PythonRunner scala: 4 30) в орг. apache .spark.util.Utils $ .logUncaughtExceptions (Utils. scala: 2136) в орг. apache .spark.api. python .BasePythonRunner $ WriterThread.run (PythonRunner. scala: 236)

Мой вопрос, я думаю, состоит из двух частей:

  1. Есть ли лучший подход к достижению общего количества пар, существующих в начальном кадре данных на основе col (A) и col (B)?
  2. Если нет (крайне маловероятно;)), есть ли способ оптимизировать мой подход выше, чтобы избежать ошибки памяти ?

Заранее спасибо!

Дэвид

1 Ответ

0 голосов
/ 03 мая 2020

В итоге, декартово соединение / кросс-продукт с большими наборами данных называется «соединение больших и больших таблиц». Что-то в Spark не очень хорошо ..

Если у вас недостаточно ОЗУ, а затем достаточно диска для разлива, он не будет работать и не получит OOM. Размер разделов теперь больше, но в итоге это кошмарный трафик между исполнителями.

...