Существует ли простой способ параллельного запуска DataFrames :: by? - PullRequest
0 голосов
/ 28 сентября 2018

У меня есть большой фрейм данных, который я хочу вычислять параллельно.Вызов, который я хочу распараллелить,

df = by(df, [:Chromosome], some_func)

Есть ли способ легко распараллелить это?Желательно без какого-либо копирования.

Кроме того, я предполагаю, что используемые типы распараллеливания должны отличаться в зависимости от размера групп, созданных с помощью.


Минимальный воспроизводимый пример для использования в ответах:

using DataFrames, CSV, Pkg
iris = CSV.read(joinpath(Pkg.dir("DataFrames"), "test/data/iris.csv"))
iris_count = by(iris, [:Species], nrow)

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

Начните Джулию с julia -p 4, затем запустите

using CSV, DataFrames

iris = CSV.read(joinpath(dirname(pathof(DataFrames)),"..","test/data/iris.csv"))

g = groupby(iris, :Species)

pmap(nrow, [i for i in g])

Это будет работать параллельно для группы.

0 голосов
/ 29 сентября 2018

В Windows запустите в консоли (настройте количество ядер / потоков):

$ set JULIA_NUM_THREADS=4
$ julia

В Linux запустите в консоли:

$ export JULIA_NUM_THREADS=4
$ julia

Теперь проверьте, работает ли оно:

julia> Threads.nthreads()
4

Запустите приведенный ниже код (я обновляю ваш код, чтобы соответствовать Julia 1.0):

using CSV, DataFrames, BenchmarkTools
iris = CSV.read(joinpath(dirname(pathof(DataFrames)),"..","test/data/iris.csv"))
iris.PetalType = iris.PetalWidth .> 2;  #add an additional column for testing

Давайте определим некоторую функцию, которая работает с частью DataFrame

 function nrow2(df::AbstractDataFrame)
     val = nrow(df) 
     #do something much more complicated...
     val
 end

Теперь самая сложная часть головоломки:

function par_by(df::AbstractDataFrame,f::Function,cols::Symbol...;block_size=40)
    #f needs to be precompiled - we precompile using the first row of the DataFrame.
    #If try to do it within @thread macro
    #Julia will crash in most ugly and unexpected ways
    #if you comment out this line you can observe a different crash with every run
    by(view(df,1:1),[cols...],f);

    nr = nrow(df)
    local dfs = DataFrame()
    blocks = Int(ceil(nr/block_size))
    s = Threads.SpinLock()
    Threads.@threads for block in 1:blocks
        startix = (block-1)*block_size+1
        endix = min(block*block_size,nr)
        rv= by(view(df,startix:endix), [cols...], f)
        Threads.lock(s)
        if nrow(dfs) == 0  
            dfs = rv
        else 
            append!(dfs,rv)
        end
        Threads.unlock(s)
    end
    dfs
end

Давайте проверим ее и скомпилируем результаты

julia> res = par_by(iris,nrow2,:Species)
6×2 DataFrame
│ Row │ Species    │ x1    │
│     │ String     │ Int64 │
├─────┼────────────┼───────┤
│ 1   │ versicolor │ 20    │
│ 2   │ virginica  │ 20    │
│ 3   │ setosa     │ 10    │
│ 4   │ versicolor │ 30    │
│ 5   │ virginica  │ 30    │
│ 6   │ setosa     │ 40    │


julia> by(res, :Species) do df;DataFrame(x1=sum(df.x1));end
3×2 DataFrame
│ Row │ Species    │ x1    │
│     │ String     │ Int64 │
├─────┼────────────┼───────┤
│ 1   │ setosa     │ 50    │
│ 2   │ versicolor │ 50    │
│ 3   │ virginica  │ 50    │

par_by также поддерживает несколькоколонки

julia> res = par_by(iris,nrow2,:Species,:PetalType)
8×3 DataFrame
│ Row │ Species   │ PetalType │ x1    │
│     │ String    │ Bool      │ Int64 │
├─────┼───────────┼───────────┼───────┤
│ 1   │ setosa    │ false     │ 40    │
⋮
│ 7   │ virginica │ true      │ 13    │
│ 8   │ virginica │ false     │ 17    │

@ Bogumił Kamiński отметили, что разумно использовать groupby() перед темЕсли по какой-либо причине groupby стоимость не слишком высока (требуется полное сканирование), это рекомендуемый способ - упрощает агрегацию.

 ress = DataFrame(Species=String[],count=Int[])
 for group in groupby(iris,:Species)
     r = par_by(group,nrow2,:Species,block_size=15)
     push!(ress,[r.Species[1],sum(r.x1)])
 end 


 julia> ress
 3×2 DataFrame
 │ Row │ Species    │ count │
 │     │ String     │ Int64 │
 ├─────┼────────────┼───────┤
 │ 1   │ setosa     │ 50    │
 │ 2   │ versicolor │ 50    │
 │ 3   │ virginica  │ 50    │

Обратите внимание, что в приведенном выше примере только три группы, поэтому мы распараллеливаемся по каждой группе.Однако, если у вас большое количество групп, вы можете рассмотреть возможность запуска:

function par_by2(df::AbstractDataFrame,f::Function,cols::Symbol...)
    res = NamedTuple[]
    s = Threads.SpinLock()
    groups = groupby(df,[cols...])
    f(view(groups[1],1:1));
    Threads.@threads for g in 1:length(groups)
        rv= f(groups[g])
        Threads.lock(s)
        key=tuple([groups[g][cc][1] for cc in cols]...)
        push!(res,(key=key,val=rv))
        Threads.unlock(s)
    end
    res
end

julia> iris.PetalType = iris.PetalWidth .> 2;

julia> par_by2(iris,nrow2,:Species,:PetalType)
4-element Array{NamedTuple,1}:
 (key = ("setosa", false), val = 50)
 (key = ("versicolor", false), val = 50)
 (key = ("virginica", true), val = 23)
 (key = ("virginica", false), val = 27)

Дайте мне знать, если это сработало для вас.Поскольку у большего количества людей может быть похожая проблема, я превращу этот код в пакет Julia (и именно поэтому я сохранил этот код как общий)

...