(Объединение в Spark) Как объединить два больших RDD Spark с сильно дублированными ключами без проблем с памятью? - PullRequest
0 голосов
/ 29 мая 2018

В этом предыдущем вопросе я пытался избежать проблем с памятью с Spark join, избегая использования join.

В этом новом вопросе я использую join, но пытаюсь исправить проблемы с памятью .

Это два моих СДР:

  1. productToCustomerRDD:
    Размер: очень большой, может иметь миллионы различных ключей
    Разделение на ключи с HashPartitioner
    Некоторые ключи будут очень дублированыа некоторые не будут.

    (toast, John)
    (butter, John)
    (toast, Jane)
    (jelly, Jane)
    
  2. productToCountRDD:
    Размер: очень большой, может иметь миллионы различных ключей, слишком большой для broadcast
    Разделение на ключи с HashPartitioner
    Ключи уникальны , значение - количество клиентов, которые приобрели продукт.

    (toast, 2)
    (butter, 1)
    (jelly, 1)
    

Я хотел бы присоединиться к этим двум СДР, результат будет:

customerToProductAndCountRDD:

(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))

Если я присоединяюсь к двум RDD с productToCustomerRDD.join(productToCountRDD), я получаю OutOfMemoryError на двух разделах (из тысяч).В интерфейсе Spark я заметил, что на этапе, который содержит join, в столбце Input Size / Records все разделы имеют ряд записей от 4K до 700K ,Все, кроме двух разделов, которые создали OOM: один имеет 9M записей, а другой - 6M записей.

Как я понимаю, для присоединения,пары с одним и тем же ключом необходимо перетасовать и переместить в один и тот же раздел (если они ранее не были разделены по ключу).Однако, поскольку некоторые ключи встречаются очень часто (например, продукт, который был приобретен почти каждым клиентом в наборе данных), огромное количество данных может быть перемещено в один раздел, либо во время join, либо во время * 1066.* прямо перед объединением.

Правильно ли я понимаю?
Есть ли способ избежать этого?
Может ли быть способ до join, не имея всех данных для одного сильно дублированногоключ на том же разделе?

Ответы [ 2 ]

0 голосов
/ 31 августа 2018

На самом деле, это стандартная проблема в Spark, называемая «перекосом соединения»: одна из сторон объединения перекошена, то есть некоторые ее ключи встречаются гораздо чаще, чем другие.Некоторые ответы, которые не сработали для меня, можно найти здесь .

Стратегия, которую я использовал, основана на методе GraphFrame.skewedJoin(), определенном здесь и его использовании.в ConnectedComponents.skewedJoin() здесь .Объединение будет выполняться путем объединения наиболее часто используемых ключей с использованием широковещательного объединения и менее часто используемых ключей с использованием стандартного объединения.

В моем примере (OP) productToCountRDD уже содержит информацию о частоте нажатия клавиш.Таким образом, это выглядит так:

  • Фильтр productToCountRDD, чтобы сохранить только значения, превышающие фиксированный порог, и collectAsMap() для драйвера.
  • Передать эту карту всем исполнителям..
  • Разделение productToCustomerRDD на два RDD: ключи, которые находятся на карте вещания (частые ключи), и ключи, которых нет (нечастые ключи).
  • Объединение для частых ключейвыполняется с помощью mapToPair, получение count с карты трансляции
  • Соединение для нечастых ключей выполняется с помощью join.
  • Используйте union в конце, чтобы получитьполный СДР.
0 голосов
/ 29 мая 2018

мой первый вопрос: вам действительно нужны эти подробные данные?Тебе действительно нужно знать, что Джон купил 2 тота и так далее?Мы находимся в контексте больших данных и работаем с большим количеством данных, поэтому иногда агрегирование полезно для уменьшения количества элементов и получения хороших результатов с точки зрения анализа и производительности.Поэтому, если вы хотите узнать, сколько раз продукт был продан, вы можете использовать pairRDD (product, count) [таким образом, у вас будет один элемент для каждого продукта] или если вы хотите узнать предпочтения пользователей, вы можете использовать pairRDD (пользователь, список купленных продуктов) [таким образом, у вас будет элемент для каждого пользователя].Если вам действительно нужно знать, что тост был куплен у Jhon, почему вы хотите разделить тост-ключ на разные перераспределения?Таким образом, вы не можете вычислить глобальный результат, потому что в каждом чанке у вас будет только часть информации о ваших ключах.

...