Я пытаюсь выполнить некоторые операции с фреймом данных pyspark.Фрейм данных выглядит примерно так:
user domain1 domain2 ........ domain100 conversions
abcd 1 0 ........ 0 1
gcea 0 0 ........ 1 0
. . . ........ . .
. . . ........ . .
. . . ........ . .
Код, который я использую, прекрасно работает для меня, чтобы в дальнейшем работать с вышеуказанным фреймом, если фрейм данных небольшой, например, он отлично работает для фрейма данных следующегоshape:
(148457,41)
Но если я увеличу размер фрейма данных, например:
(2184934,324)
, я не смогу перейти вперед, потому что время ожидания ноутбука истекло или выдается сообщение об ошибке тайм-аута сеансакак только я выполню любой код на кадре данных, даже что-то вроде таймаута операции count ().Вот как выглядит сообщение о тайм-ауте:
An error was encountered:
Invalid status code '400' from
https://172.31.12.103:18888/sessions/5/statements/20 with error
payload:
"requirement failed: Session isn't active."
Этот тайм-аут занимает 1 или 2 секунды (не занимает много времени).
Я не использую collect () или любые другиеtopandas () для тайм-аута.То, что я пытаюсь сделать с вышеупомянутым фреймом данных, это недостаточная выборка данных, но я не могу заставить простую операцию .count () работать после увеличения размера фрейма данных.
Я уже пытался использоватьразличные типы экземпляров в моем кластере emr, чтобы он работал.Когда я использую меньший фрейм данных, достаточно экземпляра типа c5.2xlarge, но для большего фрейма данных он не работает, даже если я использую экземпляры c5.18xlarge.У меня есть 1 главный узел и 2 подчиненных узла в моем кластере.
Это то, что я пытаюсь сделать с фреймом данных
#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
pos = base_features.filter(col(class_field)==base_class)
neg = base_features.filter(col(class_field)!=base_class)
total_pos = pos.count()
total_neg = neg.count()
fraction=float(total_pos*ratio)/float(total_neg)
sampled = neg.sample(True,fraction)
a = sampled.union(pos)
return a
undersampled_df = resample(df,10,'conversions',1)
Как я могу решить эту проблему?Любые предложения о том, какие шаги я должен предпринять?