Как мне управлять рубиновыми потоками, чтобы они закончили всю свою работу? - PullRequest
24 голосов
/ 05 июня 2011

У меня есть вычисление, которое можно разделить на независимые единицы, и способ, которым я сейчас им занимаюсь, заключается в создании фиксированного числа потоков и последующей передаче кусков работы, выполняемой в каждом потоке.Таким образом, в псевдокоде вот как это выглядит

# main thread
work_units.take(10).each {|work_unit| spawn_thread_for work_unit}

def spawn_thread_for(work)
  Thread.new do
    do_some work
    more_work = work_units.pop
    spawn_thread_for more_work unless more_work.nil?
  end
end

По сути, после создания начального числа потоков каждый выполняет некоторую работу, а затем продолжает брать вещи из рабочего стека, пока ничего не останется.Все работает нормально, когда я запускаю что-то в irb, но когда я выполняю скрипт с использованием интерпретатора, вещи не работают так хорошо.Я не уверен, как заставить основной поток ждать, пока вся работа не будет закончена.Есть хороший способ сделать это, или я застрял с выполнением sleep 10 until work_units.empty? в основной теме

Ответы [ 5 ]

32 голосов
/ 10 сентября 2013

В ruby ​​1.9 (и 2.0) вы можете использовать ThreadsWait из stdlib для этой цели:

require 'thread'
require 'thwait'

threads = []
threads << Thread.new { }
threads << Thread.new { }
ThreadsWait.all_waits(*threads)
15 голосов
/ 05 июня 2011

Если вы измените spawn_thread_for, чтобы сохранить ссылку на созданный вами Thread, то вы можете вызвать Thread#join в потоке, чтобы дождаться завершения:

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

производит:

abxyzc

(Украдено из документации ri Thread.new. Подробнее см. В документации ri Thread.join.)

Итак, если вы измените spawn_thread_for, чтобы сохранить ссылки на темы, вы можете присоединиться квсе они:

(не проверено, но должно придать вкус)

# main thread
work_units = Queue.new # and fill the queue...

threads = []
10.downto(1) do
  threads << Thread.new do
    loop do
      w = work_units.pop
      Thread::exit() if w.nil?
      do_some_work(w)
    end
  end
end

# main thread continues while work threads devour work

threads.each(&:join)
3 голосов
/ 05 июня 2011

Похоже, вы реплицируете библиотеку Parallel Each ( Peach ).

2 голосов
/ 13 сентября 2016
Thread.list.each{ |t| t.join unless t == Thread.current }
0 голосов
/ 11 июня 2018

Вы можете использовать Тема # присоединиться

присоединиться (p1 = v1) публично

Вызывающий поток приостановит выполнение и запустит thr. Не возвращается до тех пор, пока не выйдет thr или пока не пройдут лимитные секунды. Если срок истекает, возвращается ноль, иначе возвращается thr.

Также вы можете использовать Enumerable # each_slice для перебора рабочих блоков в пакетах

work_units.each_slice(10) do |batch|
  # handle each work unit in a thread
  threads = batch.map do |work_unit|
    spawn_thread_for work_unit
  end

  # wait until current batch work units finish before handling the next batch
  threads.each(&:join)
end
...