У меня есть следующий код в функции:
num_procs.times do
pid = fork
unless pid
DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new_with_uri('druby://localhost:53421'))
loop do
item = ts.take([:enum, nil, nil])
# our termination tuple
break if item == [:enum, -1, nil]
result =
begin
block.call(item[2])
rescue Object => e
e
end
# return result
ts.write([:result, item[1], result])
end
DRb.stop_service
exit!
end
pids << pid
end
pts = Rinda::TupleSpace.new
# write termination tuples
items.size.times do
pts.write([:enum, -1, nil])
end
items.each_with_index { |item, index|
pts.write([:enum, index, item])
}
DRb.start_service('druby://localhost:53421', pts)
# Grab results
items.size.times do
result_tuples << pts.take([:result, nil, nil])
end
pp "Waiting for pids: #{pids.inspect}" if FORKIFY_DEBUG
pids.each { |p| Process.waitpid(p) }
DRb.stop_service
# gather results and sort them
result_tuples.map { |t|
results[t[1]] = t[2]
}
return results
Этот код разветвляется разное количество раз, потом потом пытаются получить кортежи от родителя, используя Rinda :: TupleSpaceProxy через DRb. Родитель помещает элементы для каждого процесса для вызова блока. Затем дети возвращают свои результаты с разными кортежами родителю, который их объединяет.
Этот код находится в библиотеке, поэтому я не хочу, чтобы пользователь запускал что-то вроде beanstalkd или его эквивалента, просто чтобы ставить задачи в очередь для пула процессов. У меня вопрос такой:
Есть ли лучший способ, которым я могу выполнять межпроцессное взаимодействие в очереди?