В Windows запустите в консоли (настройте количество ядер / потоков):
$ set JULIA_NUM_THREADS=4
$ julia
В Linux запустите в консоли:
$ export JULIA_NUM_THREADS=4
$ julia
Теперь проверьте, работает ли оно:
julia> Threads.nthreads()
4
Запустите приведенный ниже код (я обновляю ваш код, чтобы соответствовать Julia 1.0):
using CSV, DataFrames, BenchmarkTools
iris = CSV.read(joinpath(dirname(pathof(DataFrames)),"..","test/data/iris.csv"))
iris.PetalType = iris.PetalWidth .> 2; #add an additional column for testing
Давайте определим некоторую функцию, которая работает с частью DataFrame
function nrow2(df::AbstractDataFrame)
val = nrow(df)
#do something much more complicated...
val
end
Теперь самая сложная часть головоломки:
function par_by(df::AbstractDataFrame,f::Function,cols::Symbol...;block_size=40)
#f needs to be precompiled - we precompile using the first row of the DataFrame.
#If try to do it within @thread macro
#Julia will crash in most ugly and unexpected ways
#if you comment out this line you can observe a different crash with every run
by(view(df,1:1),[cols...],f);
nr = nrow(df)
local dfs = DataFrame()
blocks = Int(ceil(nr/block_size))
s = Threads.SpinLock()
Threads.@threads for block in 1:blocks
startix = (block-1)*block_size+1
endix = min(block*block_size,nr)
rv= by(view(df,startix:endix), [cols...], f)
Threads.lock(s)
if nrow(dfs) == 0
dfs = rv
else
append!(dfs,rv)
end
Threads.unlock(s)
end
dfs
end
Давайте проверим ее и скомпилируем результаты
julia> res = par_by(iris,nrow2,:Species)
6×2 DataFrame
│ Row │ Species │ x1 │
│ │ String │ Int64 │
├─────┼────────────┼───────┤
│ 1 │ versicolor │ 20 │
│ 2 │ virginica │ 20 │
│ 3 │ setosa │ 10 │
│ 4 │ versicolor │ 30 │
│ 5 │ virginica │ 30 │
│ 6 │ setosa │ 40 │
julia> by(res, :Species) do df;DataFrame(x1=sum(df.x1));end
3×2 DataFrame
│ Row │ Species │ x1 │
│ │ String │ Int64 │
├─────┼────────────┼───────┤
│ 1 │ setosa │ 50 │
│ 2 │ versicolor │ 50 │
│ 3 │ virginica │ 50 │
par_by
также поддерживает несколькоколонки
julia> res = par_by(iris,nrow2,:Species,:PetalType)
8×3 DataFrame
│ Row │ Species │ PetalType │ x1 │
│ │ String │ Bool │ Int64 │
├─────┼───────────┼───────────┼───────┤
│ 1 │ setosa │ false │ 40 │
⋮
│ 7 │ virginica │ true │ 13 │
│ 8 │ virginica │ false │ 17 │
@ Bogumił Kamiński отметили, что разумно использовать groupby()
перед темЕсли по какой-либо причине groupby
стоимость не слишком высока (требуется полное сканирование), это рекомендуемый способ - упрощает агрегацию.
ress = DataFrame(Species=String[],count=Int[])
for group in groupby(iris,:Species)
r = par_by(group,nrow2,:Species,block_size=15)
push!(ress,[r.Species[1],sum(r.x1)])
end
julia> ress
3×2 DataFrame
│ Row │ Species │ count │
│ │ String │ Int64 │
├─────┼────────────┼───────┤
│ 1 │ setosa │ 50 │
│ 2 │ versicolor │ 50 │
│ 3 │ virginica │ 50 │
Обратите внимание, что в приведенном выше примере только три группы, поэтому мы распараллеливаемся по каждой группе.Однако, если у вас большое количество групп, вы можете рассмотреть возможность запуска:
function par_by2(df::AbstractDataFrame,f::Function,cols::Symbol...)
res = NamedTuple[]
s = Threads.SpinLock()
groups = groupby(df,[cols...])
f(view(groups[1],1:1));
Threads.@threads for g in 1:length(groups)
rv= f(groups[g])
Threads.lock(s)
key=tuple([groups[g][cc][1] for cc in cols]...)
push!(res,(key=key,val=rv))
Threads.unlock(s)
end
res
end
julia> iris.PetalType = iris.PetalWidth .> 2;
julia> par_by2(iris,nrow2,:Species,:PetalType)
4-element Array{NamedTuple,1}:
(key = ("setosa", false), val = 50)
(key = ("versicolor", false), val = 50)
(key = ("virginica", true), val = 23)
(key = ("virginica", false), val = 27)
Дайте мне знать, если это сработало для вас.Поскольку у большего количества людей может быть похожая проблема, я превращу этот код в пакет Julia (и именно поэтому я сохранил этот код как общий)