Как оптимально передать большой объект в качестве дополнительного параметра функции карты без копирования этого объекта между рабочими местами / заданиями? - PullRequest
0 голосов
/ 06 февраля 2019

У меня есть список кортежей (tuples_list).Я хочу выполнить операцию с картой, но часть операции с картой требует информации из довольно большой матрицы (матрицы).Не пишет в матрицу, только читает.Это сжатая скудная матрица разреженных строк (csr_matrix).

Таким образом, функция карты может выглядеть примерно так:

def map_function(list_element, matrix):
    info = get_element_specific_info_from_matrix(list_element, matrix)
    new_element = get_new_element(info)
    return new_element

Вот краткий обзор того, что делает мой код:

from pyspark import SparkContext

sc = SparkContext("local", "Process Name")
matrix = ...
tuples_list = ...

...

tuples_list = sc.parallelize(tuples_list)
results_list = tuples_list.map(lambda tup: map_function(tup, matrix)) 
results_list = results_list.collect() # error happens here

Проблема в том, что я продолжаю сталкиваться с проблемами кучи / памяти, и я подозреваю, что это связано с тем, что драйвер Spark создает копии этой матрицы для передачи своим работникам:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readBroadcastFromFile.
: java.lang.OutOfMemoryError: Java heap space

Я не могу передать подмножество этой матрицы, к сожалению.Невозможно сделать какие-либо предположения о том, какие данные необходимы элементу из матрицы.

Что я хотел бы знать :

  • Как проверить, является ли илине копии сделаны из этой матрицы?
  • Если копии сделаны, как я могу сказать Spark не делать копии?(Эта работа выполняется на сервере SLURM / совместно используемой памяти.)
  • Если Spark не делает копии, какие шаги я могу предпринять для диагностики реальной проблемы?
  • Использую ли яправильная терминология (водитель, рабочие) правильными способами?Если нет, поправьте меня.

Спасибо!

1 Ответ

0 голосов
/ 06 февраля 2019

Короче говоря, вы не можете.Но, чтобы ответить на ваши вопросы шаг за шагом

Как я могу проверить, сделаны ли копии этой матрицы или нет?

На самом деле существует несколько копий, в обоих сериализованныхи десериализованная форма.Поскольку вы используете сериализованные версии PySpark, в какой-то момент они существуют как на JVM (там, где ваш код не работает), так и на стороне Python.

Если копии сделаны, как я могу сказать Spark не делать копии?

Вы не можете.Spark - это система распределенной обработки, и ее выбор дизайна не имеет ни малейшего смысла для системы с общей памятью.В частности, существует косвенность, вызванная подходом хост-гость, и внутреннее дублирование, вызванное архитектурой супервизор-работник.Наконец, в PySpark появилась еще одна изоляция, где каждый работник использует свой собственный процесс.

Подождите, это еще не все - режим local - это инструмент тестирования, а не готовый к работе механизм (не говоря уже о том, что local даже не параллелен ).

Есть несколько небольших окон, которые могут уменьшить дублирование - распределять данные через файловую систему и использовать структуры данных с отображением в памяти, но на самом деле, просто выберите инструмент, которыйправо на работу, и может полностью использовать ресурсы (особенно с учетом неравномерного доступа к памяти).Искра не одна из них.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...