Dask EMR Проблемы с памятью - PullRequest
0 голосов
/ 18 марта 2020

Полагаю, я совершенно не понимаю, как работает Dask на EMR. У меня есть очень большой 1,7 терабайтный файл, состоящий из более чем 1000 столбцов текстовых данных в формате фиксированной ширины. Очевидно, что он слишком велик, чтобы поместиться в память, и я не хочу запускать несколько экземпляров EC2 с высокой памятью (возможно, кластер 10-20 m4.xl). Мне нужно только работать с подмножеством данных. Тем не менее, просто загрузить и обрезать данные в Dask оказывается более сложной задачей, чем я ожидал. У меня сложилось впечатление, что данные могут храниться на диске в сумме, и только то, что нужно в данный момент, потянуто в оперативную память. Разве это не так?

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
import s3fs
import pandas as pd

cluster = YarnCluster()
cluster.adapt(minimum=10)
# Connect to the cluster
client = Client(cluster)

layout=pd.read_csv('s3://bucket/path/layout.csv')
layout['Start']-=1
wlist=list(zip(layout.Start,layout.End))
nlist=list(layout.Field)

fname='s3://bucket/path/file'
df=dd.read_fwf(fname,header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],usecols=['x112','x111','x110','x220','x889','x336','x979','x125','x43','ID','AID','XID','LID','CITYNAME','STATECD'],blocksize='250MB')

cnt=df[(df.STATECD=='PA')&(df.CITYNAME=='PITTSBURG')&(df.x112=='S')]
client.persist(cnt)
len(cnt) #<--keeps crashing... Can't get the number of rows of this

Сначала казалось, что все, что я делал, загружалось только на мастер, пока я не поместил аргумент размера блока в read_fwf в dask. Мне удалось сохранить отфильтрованный набор данных 'cnt', и он, похоже, занимает 710 МБ (7 662 задач) в кластере в соответствии с пользовательским интерфейсом dask. Тем не менее, каждый раз, когда я пытаюсь подсчитать количество строк, у которых есть бомбы с помощью «KilledWorker». Я предполагаю, что это проблема памяти. Что я неправильно понимаю или делаю неправильно?

...