Лучший способ хранить результаты многопоточных вызовов функций - PullRequest
0 голосов
/ 01 апреля 2019

У меня есть функция f(), которая возвращает DataFrame, количество строк которого я не знаю заранее. Я звоню f() в многопоточном контексте. Я сохраняю результаты так:

results = [DataFrame() for _ in 1:100]

Threads.@threads for hi in 1:100
    results[hi] = f(df)
end

Когда я запускаю этот код, использование памяти резко возрастает, предположительно потому, что results вынужден постоянно изменять свой размер, когда он получает размер DataFrame [РЕДАКТИРОВАТЬ: это не так]. Каков наилучший способ предварительно выделить массив результатов, чтобы память не взорвалась?

**** ОБНОВЛЕНИЕ с MWE ****

function func(df::DataFrame)
    X = df[:time]
    indices = findall(X .> 0)
end

# read in R data
rds = "blablab.rds"
objs = load(rds);

params = collect(0.5:0.005:0.7);

for i in 1:length(objs)
    cols = [string(name) for name in names(objs.data[i]) if occursin("blabla",string(name))]
    hypers = [(a,b) for a in cols, b in params]

    results = [DataFrame() for _ in 1:length(hypers)]

    # HERE IS WHERE THE MEMORY BLOWS UP
    Threads.@threads for hi in 1:length(hypers)
        name, val = hypers[hi]
        results[hi] = func(objs.data[i])
    end
end

df составляет 0,7 ГБ. Когда я запускаю этот кусок кода, мое использование памяти увеличивается до ~ 30 ГБ !!! Кажется, что доступ к столбцу df внутри func() копирует все это?

1 Ответ

1 голос
/ 01 апреля 2019

Ниже приведены две версии одного и того же кода - однопотоковая и многопоточная, генерирующая DataFrame из набора DataFrame s, возвращаемых функцией f() и имеющих случайную длину.

using Random
using DataFrames
using BenchmarkTools

function f(rngs::Vector{Random.MersenneTwister}, offset)::DataFrame
    t = Threads.threadid()
    n = rand(rngs[t+offset], 1:20)
    DataFrame(a=1:n,b=21:(20+n),t=t+offset)
end

function test_threads(rngs::Vector{Random.MersenneTwister})
    res = DataFrame([Int,Int,Int],[:a,:b,:t],0)
    lock = Threads.SpinLock()
    Threads.@threads for i in 1:100
        df = f(rngs,0)
        Threads.lock(lock)
        append!(res,df)
        Threads.unlock(lock)
    end
    res
end

function test_normal(rngs::Vector{Random.MersenneTwister})    
    res = DataFrame([Int,Int,Int],[:a,:b,:t],0)    
    for i in 1:100
        append!(res,f(rngs, i%2))
    end
    res
end

Теперь давайте проведем тестирование:

julia> rngs = [Random.MersenneTwister(i) for i in 1:2];

julia> @btime test_normal($rngs);

  891.306 μs (5983 allocations: 476.67 KiB)

rngs = [Random.MersenneTwister(i) for i in 1:Threads.nthreads()];

@btime test_threads($rngs);

  674.559 μs (5549 allocations: 425.69 KiB)
...