Я пытаюсь прочитать большой (не умещающийся в памяти) набор данных паркета, а затем образец из него.Каждый раздел набора данных идеально вписывается в память.
Набор данных содержит около 20 ГБ данных на диске, разделенных на 104 раздела по 200 МБ каждый.Я не хочу использовать более 40 ГБ памяти в любой момент, поэтому я устанавливаю n_workers
и memory_limit
соответственно.
Моя гипотеза состояла в том, что Dask будет загружать столько разделов, сколько сможет обработать, делать выборки из них, удалять их из памяти и затем продолжать загружать следующие.Или что-то в этом роде.
Вместо этого, судя по графику выполнения (104 операции загрузки параллельно, после каждого образца), похоже, что он пытается загрузить все разделы одновременно, и поэтому рабочих продолжают убивать занедостаточно памяти.
Я что-то упустил?
Это мой код:
from datetime import datetime
from dask.distributed import Client
client = Client(n_workers=4, memory_limit=10e9) #Gb per worker
import dask.dataframe as dd
df = dd.read_parquet('/path/to/dataset/')
df = df.sample(frac=0.01)
df = df.compute()
Чтобы воспроизвести ошибку, вы можете создать фиктивный набор данных 1/10размер того, который я пытался загрузить, используя этот код, и попробуйте мой код с 1GB memory_limit=1e9
для компенсации.
from dask.distributed import Client
client = Client() #add restrictions depending on your system here
from dask import datasets
df = datasets.timeseries(end='2002-12-31')
df = df.repartition(npartitions=104)
df.to_parquet('./mock_dataset')