У меня была похожая проблема при чтении графика из файла. Обработка включала вычисление матрицы с плавающей запятой 200 000x200 000 (по одной строке за раз), которая не помещалась в память. Попытка освободить память между вычислениями с использованием gc.collect()
устранила проблему, связанную с памятью, но это привело к проблемам с производительностью: я не знаю, почему, хотя объем используемой памяти оставался постоянным, каждый новый вызов gc.collect()
заняло немного больше времени, чем предыдущий. Так что довольно быстро сборка мусора заняла большую часть времени вычислений.
Чтобы исправить проблемы с памятью и производительностью, я переключился на использование многопоточного трюка, который я однажды где-то прочитал (извините, я больше не могу найти соответствующий пост). Прежде чем я читал каждую строку файла в большом цикле for
, обрабатывал его и запускал gc.collect()
время от времени, чтобы освободить место в памяти. Теперь я вызываю функцию, которая читает и обрабатывает кусок файла в новом потоке. Как только поток заканчивается, память автоматически освобождается без странной проблемы с производительностью.
Практически это работает так:
from dask import delayed # this module wraps the multithreading
def f(storage, index, chunk_size): # the processing function
# read the chunk of size chunk_size starting at index in the file
# process it using data in storage if needed
# append data needed for further computations to storage
return storage
partial_result = delayed([]) # put into the delayed() the constructor for your data structure
# I personally use "delayed(nx.Graph())" since I am creating a networkx Graph
chunk_size = 100 # ideally you want this as big as possible while still enabling the computations to fit in memory
for index in range(0, len(file), chunk_size):
# we indicates to dask that we will want to apply f to the parameters partial_result, index, chunk_size
partial_result = delayed(f)(partial_result, index, chunk_size)
# no computations are done yet !
# dask will spawn a thread to run f(partial_result, index, chunk_size) once we call partial_result.compute()
# passing the previous "partial_result" variable in the parameters assures a chunk will only be processed after the previous one is done
# it also allows you to use the results of the processing of the previous chunks in the file if needed
# this launches all the computations
result = partial_result.compute()
# one thread is spawned for each "delayed" one at a time to compute its result
# dask then closes the tread, which solves the memory freeing issue
# the strange performance issue with gc.collect() is also avoided