В этом предыдущем вопросе я пытался избежать проблем с памятью с Spark join
, избегая использования join
.
В этом новом вопросе я использую join
, но пытаюсь исправить проблемы с памятью .
Это два моих СДР:
productToCustomerRDD:
Размер: очень большой, может иметь миллионы различных ключей
Разделение на ключи с HashPartitioner
Некоторые ключи будут очень дублированыа некоторые не будут.
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
productToCountRDD:
Размер: очень большой, может иметь миллионы различных ключей, слишком большой для broadcast
Разделение на ключи с HashPartitioner
Ключи уникальны , значение - количество клиентов, которые приобрели продукт.
(toast, 2)
(butter, 1)
(jelly, 1)
Я хотел бы присоединиться к этим двум СДР, результат будет:
customerToProductAndCountRDD:
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))
Если я присоединяюсь к двум RDD с productToCustomerRDD.join(productToCountRDD)
, я получаю OutOfMemoryError
на двух разделах (из тысяч).В интерфейсе Spark я заметил, что на этапе, который содержит join
, в столбце Input Size / Records
все разделы имеют ряд записей от 4K до 700K ,Все, кроме двух разделов, которые создали OOM: один имеет 9M записей, а другой - 6M записей.
Как я понимаю, для присоединения,пары с одним и тем же ключом необходимо перетасовать и переместить в один и тот же раздел (если они ранее не были разделены по ключу).Однако, поскольку некоторые ключи встречаются очень часто (например, продукт, который был приобретен почти каждым клиентом в наборе данных), огромное количество данных может быть перемещено в один раздел, либо во время join
, либо во время * 1066.* прямо перед объединением.
Правильно ли я понимаю?
Есть ли способ избежать этого?
Может ли быть способ до join
, не имея всех данных для одного сильно дублированногоключ на том же разделе?