Время ожидания сеанса ноутбука EMR в течение нескольких секунд (с использованием pyspark) на большом фрейме данных (pyspark) - PullRequest
1 голос
/ 28 марта 2019

Я пытаюсь выполнить некоторые операции с фреймом данных pyspark.Фрейм данных выглядит примерно так:

    user    domain1    domain2 ........ domain100    conversions

    abcd      1          0     ........    0            1
    gcea      0          0     ........    1            0
     .        .          .     ........    .            .
     .        .          .     ........    .            .
     .        .          .     ........    .            .

Код, который я использую, прекрасно работает для меня, чтобы в дальнейшем работать с вышеуказанным фреймом, если фрейм данных небольшой, например, он отлично работает для фрейма данных следующегоshape:

    (148457,41)

Но если я увеличу размер фрейма данных, например:

    (2184934,324)

, я не смогу перейти вперед, потому что время ожидания ноутбука истекло или выдается сообщение об ошибке тайм-аута сеансакак только я выполню любой код на кадре данных, даже что-то вроде таймаута операции count ().Вот как выглядит сообщение о тайм-ауте:

    An error was encountered:
    Invalid status code '400' from 
    https://172.31.12.103:18888/sessions/5/statements/20 with error 
    payload: 
    "requirement failed: Session isn't active."

Этот тайм-аут занимает 1 или 2 секунды (не занимает много времени).

Я не использую collect () или любые другиеtopandas () для тайм-аута.То, что я пытаюсь сделать с вышеупомянутым фреймом данных, это недостаточная выборка данных, но я не могу заставить простую операцию .count () работать после увеличения размера фрейма данных.

Я уже пытался использоватьразличные типы экземпляров в моем кластере emr, чтобы он работал.Когда я использую меньший фрейм данных, достаточно экземпляра типа c5.2xlarge, но для большего фрейма данных он не работает, даже если я использую экземпляры c5.18xlarge.У меня есть 1 главный узел и 2 подчиненных узла в моем кластере.

Это то, что я пытаюсь сделать с фреймом данных

#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction=float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(True,fraction)
    a = sampled.union(pos)
    return a
undersampled_df = resample(df,10,'conversions',1)

Как я могу решить эту проблему?Любые предложения о том, какие шаги я должен предпринять?

...