Julia - LoadError на работнике с функцией в файле (основной) - PullRequest
0 голосов
/ 28 марта 2019

Я начинаю пытаться использовать Джулию для параллельной обработки.

Я использую макрос @spawn в этом примере, но у меня была та же ошибка при использовании функции remotecall_fetch.

Ниже приведен код:

function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1}, threads::Int16=4)    
    # initialize workers
    addprocs(threads)

    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
threads = Int16(2)
parallel_count_proteins(flist, threads)

Ошибка возникает, когда я пытаюсь получить результаты, используя fetch():

ОШИБКА: LoadError: На рабочем 3

... а вот и трассировка стека:

Stacktrace:
 [1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:379
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [3] #remotecall_fetch#152 at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [4] remotecall_fetch at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [5] call_on_owner at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [6] fetch(::Future) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:511
 [7] parallel_count_proteins(::Array{String,1}, ::Int16) at /Users/salvocos/Google_Drive/julia_programming/mcl_graph_to_label.jl:150
 [8] top-level scope at none:0
 [9] include at ./boot.jl:326 [inlined]
 [10] include_relative(::Module, ::String) at ./loading.jl:1038
 [11] include(::Module, ::String) at ./sysimg.jl:29
 [12] exec_options(::Base.JLOptions) at ./client.jl:267
 [13] _start() at ./client.jl:436

Я знаю, что нужно, чтобы все работники знали о важности функции count_proteins, но я совершенно не уверен, как это сделать.

Ответы [ 2 ]

1 голос
/ 28 марта 2019

Как вы сказали, вам нужно сделать count_proteins доступным для всех рабочих процессов.

Вы можете использовать макрос @everywhere перед определениями функций, чтобы сделать их доступными для всех работников. @everywhere выполняет заданное выражение для всех работников.

Другим способом было бы поместить функции, которые должны быть доступными для работников, в другой файл .jl и @everywhere include("my_helper_functions.jl"), или поместить определения функций внутри блока begin...end и поместить @everywhere прямо перед begin и запустить блок. Это нужно сделать после создания рабочих процессов. Помещение таких функций в модуль / пакет и запуск @everywhere using MyModule также должны работать.

Для вашего кода решение будет

# addprocs here before @everywhere definitions
addprocs(2)

@everywhere function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})
    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
parallel_count_proteins(flist)

В качестве примечания: если я понимаю, что вы пытаетесь сделать правильно, вы можете просто использовать вместо этого pmap, который будет посылать задачи одну за другой процессам, эффективно балансируя нагрузку.

Возможно, вам будет полезно прочитать ручную запись , касающуюся доступности кода и данных в параллельных вычислениях, а также раздел «Параллельные вычисления» в целом. Для обеспечения доступности данных существует также пакет под названием ParallelDataTransfer.jl, который значительно упрощает перемещение данных между процессами, если вам это когда-либо понадобится.

0 голосов
/ 01 апреля 2019

Как хорошо объясняет выше @hckr, рабочие должны быть развернуты (используя addprocs (потоки)) перед использованием макроса @everywhere.

@ везде можно вызывать и использовать по-разному и в разных частях программы. В моем случае я загружаю функцию, которую я хочу запустить параллельно из модуля.

Чтобы использовать эту функцию параллельно с основным, я использую @everywhere include("myModule.jl").

Ниже приведен код для MyModule:

module MyModule    
using Distributed
using Printf: @printf
using Base

"""Count sequences in the input FASTA"""
function count_proteins(fpath::String)::Int
    cnt::Int = 0
    #@show fpath
    if !isfile(fpath)
        write(Base.stderr, "\nInput FASTA not found!")
    else
        open(fpath, "r") do ifd
            for ln in eachline(ifd)
                if ln[1] == '>'
                    #println(ln)
                    cnt += 1
                end
            end
        end
    end
    # return the count
    @printf("%s\t%d\n", fpath, cnt)
    cnt
end

"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})

    # spawn the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        # @show r
        s = fetch(r)
    end    
end

И далее main.jl с использованием функции parallel_count_proteins из MyModule.

### main.jl ###
using Base
using Distributed
using Printf: @printf

# add path to the modules directory
push!(LOAD_PATH, dirname(@__FILE__)) # MyModule is in the same directory as main.jl

#### MAIN START ####
# deploy the workers
addprocs(4)
# load modules with multi-core functions
@everywhere include(joinpath(dirname(@__FILE__), "MyModule.jl"))

# paths with 4 input files (all in same dir as main.jl)
flist = ["tv1", "tv2", "tv3", "tv4"]

# count proteins
MyModule.parallel_count_proteins(flist)
...