Как мне обработать вывод rdd cogroup, который в результате увеличивает объем доступной памяти в spark? - PullRequest
0 голосов
/ 15 января 2019

У меня очень большая структура, которая создается во время выполнения задания, которая имеет структуру:

(Key, (Iterable[Object], Iterable[Object]))

Для каждого ключа мне нужно найти перекрестное произведение обоих повторяемых объектов, чтобы создать другой объект, который затем будет работать, чтобы попытаться найти связь между записями. Имеется десятки тысяч ключей, и оба предметных итерируемых объекта, по большей мере, 1000 элементов. Две пары объектов образуют строку, которая затем вводится в модель ml, прежде чем информация извлекается и затем сокращается.

Проблема, с которой я столкнулся, заключается в том, что модель ожидает информационный кадр, и из-за большого размера всех строк после создания перекрестных продуктов кластеры разумного размера не могут обрабатывать это количество сразу без итераций.

Я бы оценил каждый объект в среднем на 2-3 КБ, поэтому строка из двух объектов будет иметь размер 4-6 КБ ~. Окончательный вывод будет около 200-300 байт, прежде чем уменьшится.

В настоящее время код выглядит примерно так:

val a = rdd[object]  
val b = rdd[object]

def f1 = for a; for b; yield c  

ideal 
def f2 = for a; for b; create c - then pass c into model and yield d


val c = a.cogroup(b)
        .flatMap(f2)
        .reducebykey

Если я запускаю код с f1, с функцией, которая использует только rdds и итерации, задание завершается, как и ожидалось.

Проблема с функцией f2 в настоящее время заключается в том, что она работает только на 1 экземпляре строки, но модель ожидает информационный кадр, поэтому, работая с набором семплов и сохраняя промежуточные шаги, я сделал a.cogroup(b).write.parquet, а затем прочитал его обратно в и передал это через модель. С полным набором данных я не могу сделать a.cogroup(b).write.parquet, так как данные слишком велики. Конечным результатом, однако, должно быть то, что легко помещается в память, как показано путем ввода f1 в плоскую карту.

Я думал, что перед передачей строки в модель я мог бы попытаться распараллелить строку (строка в dataframe [row]) и передать в модель 1 объектный кадр и извлечь данные таким образом, но с помощью Размер данных и желание использовать модель по назначению (пропуская сразу несколько строк), я надеялся на лучший метод.

Есть мысли о том, как структурировать код так, чтобы данные помещались в памяти (как мне разбить большой фрейм данных, если я посмотрю на него как на целое после cogroup)?
В какой момент я объединяю фрейм данных / его части?
Должен ли я пересмотреть свое использование cogroup? (используется из-за итеративного интерфейса)

Еще раз спасибо за ваше время.

...