Кажется, @distributed работает, возвращение функции шаткое - PullRequest
0 голосов
/ 20 сентября 2018

Я только учусь, как выполнять параллельные вычисления в Джулии.Я использую @sync @distributed в начале 3x вложенного цикла for для распараллеливания вещей (см. Код внизу).Из строки println(errCmp[row, col]) я могу наблюдать за распечаткой всех элементов массива errCmp.Например,

From worker 3:    2.351134946074191e9
From worker 4:    2.3500830193505473e9
From worker 5:    2.3502416529551845e9
From worker 2:    2.3509105625656652e9
From worker 3:    2.3508352842971106e9
From worker 4:    2.3497049296121807e9
From worker 5:    2.35048428351797e9
From worker 2:    2.350742582031195e9
From worker 3:    2.350616273660934e9
From worker 4:    2.349709546599313e9

Однако, когда функция возвращается, errCmp - это массив нулей, который я предварительно выделяю в начале.

Мне не хватает заключительного срока, чтобы собрать все?

function optimizeDragCalc(df::DataFrame)
    paramGrid = [cd*AoM for cd = range(1e-3, stop = 0.01, length = 50), AoM = range(2e-4, stop = 0.0015, length = 50)]
    errCmp    = zeros(size(paramGrid))
    # totalSize = size(paramGrid, 1) * size(paramGrid, 2) * size(df.time, 1)
    @sync @distributed for row = 1:size(paramGrid, 1)
        for col = 1:size(paramGrid, 2)
            # Run the propagation here
            BC = 1/paramGrid[row, col]
            slns, _ = propWholeTraj(df, BC)
            for time = 1:size(df.time, 1)
                errDF = propError(slns[time], df, time)
                errCmp[row, col] += sum(errDF.totalErr)
            end # time
            # println("row: ", row, " of ",size(paramGrid, 1),"   col: ", col, " of ", size(paramGrid, 2))
            println(errCmp[row, col])
        end # col
    end # row
    # plot(heatmap(z = errCmp))
    return errCmp, paramGrid
end
errCmp, paramGrid = @time optimizeDragCalc(df)

1 Ответ

0 голосов
/ 21 сентября 2018

Вы не предоставили минимальный рабочий пример, но я думаю, что это может быть сложно.Так вот мой MWE.Предположим, что мы хотим использовать Distributed для вычисления сумм столбцов Array:

using Distributed
addprocs(2)
@everywhere using StatsBase
data = rand(1000,2000)
res = zeros(2000)
@sync @distributed for col = 1:size(data)[2]
    res[col] = StatsBase.mean(data[:,col])
    # does not work!
    # ... because data is created locally and never returned!
end

Чтобы исправить вышеприведенный код, вам нужно предоставить функцию-агрегатор (я продолжаю примернамеренно упрощено - возможна дальнейшая оптимизация).

using Distributed
addprocs(2)
@everywhere using Distributed,StatsBase
data = rand(1000,2000)    
@everywhere function t2(d1,d2)
    append!(d1,d2)
    d1
end
res = @sync @distributed (t2) for col = 1:size(data)[2]
    [(myid(),col, StatsBase.mean(data[:,col]))]
end

Теперь давайте посмотрим на результат.Мы можем видеть, что некоторые значения были рассчитаны на работника 2, а другие на работника 3:

julia> res
2000-element Array{Tuple{Int64,Int64,Float64},1}:
 (2, 1, 0.49703681326230276)
 (2, 2, 0.5035341367791002)
 (2, 3, 0.5050607022354537)
 ⋮
 (3, 1998, 0.4975699181976122)
 (3, 1999, 0.5009498778934444)
 (3, 2000, 0.499671315490524)

Дальнейшие возможные улучшения / модификации:

  • использование @spawnat для генерации значений в удаленных процессах (вместо основного процесса и их отправки)
  • use SharedArray - это позволяет автоматически распределять данные между работниками.Из моего опыта требуется очень тщательное программирование.
  • Использование ParallelDataTransfer.jl для отправки данных среди рабочих.Очень прост в использовании, неэффективен для огромного количества сообщений.
  • всегда учитывайте механизм потоков Юлии (в некоторых случаях это облегчает жизнь - опять же зависит от проблемы)
...